From a32906c7fd11c9a8554df2621a172353a523a9dd Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 5 Dec 2015 11:32:30 +0530 Subject: [PATCH] Remove FilteredServerView --- .../io/druid/indexing/common/TaskToolbox.java | 2 - .../druid/indexing/test/TestServerView.java | 15 +- .../client/BatchServerInventoryView.java | 90 ++--------- .../BatchServerInventoryViewProvider.java | 2 +- .../FilteredBatchServerViewProvider.java | 50 ------ .../io/druid/client/FilteredServerView.java | 32 ---- .../client/FilteredServerViewProvider.java | 34 ---- .../FilteredSingleServerViewProvider.java | 50 ------ .../client/SingleServerInventoryProvider.java | 2 +- .../client/SingleServerInventoryView.java | 82 +--------- .../java/io/druid/guice/ServerViewModule.java | 4 - .../plumber/RealtimePlumberSchool.java | 1 - .../io/druid/client/BrokerServerViewTest.java | 3 +- .../client/CoordinatorServerViewTest.java | 3 +- .../client/BatchServerInventoryViewTest.java | 148 ++---------------- .../plumber/RealtimePlumberSchoolTest.java | 3 - 16 files changed, 31 insertions(+), 490 deletions(-) delete mode 100644 server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java delete mode 100644 server/src/main/java/io/druid/client/FilteredServerView.java delete mode 100644 server/src/main/java/io/druid/client/FilteredServerViewProvider.java delete mode 100644 server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 55712afbf49..3dfc545dde6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -28,12 +28,10 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java index b9185547e09..d281c8db492 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java @@ -19,34 +19,21 @@ package io.druid.indexing.test; -import com.google.api.client.util.Lists; import com.google.common.base.Predicate; import com.google.common.collect.Maps; import com.metamx.common.Pair; -import io.druid.client.FilteredServerView; import io.druid.client.ServerView; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -public class TestServerView implements FilteredServerView, ServerView.SegmentCallback +public class TestServerView implements ServerView.SegmentCallback { final ConcurrentMap, Executor>> callbacks = Maps.newConcurrentMap(); - @Override - public void registerSegmentCallback( - final Executor exec, - final ServerView.SegmentCallback callback, - final Predicate filter - ) - { - callbacks.put(callback, Pair.of(filter, exec)); - } - @Override public ServerView.CallbackAction segmentAdded( final DruidServerMetadata server, diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index 44c8f7a9fe0..35fc92325dc 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -21,42 +21,33 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; /** */ @ManageLifecycle -public class BatchServerInventoryView extends ServerInventoryView> implements FilteredServerView +public class BatchServerInventoryView extends ServerInventoryView> { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); final private ConcurrentMap> zNodes = new MapMaker().makeMap(); - final private ConcurrentMap> segmentPredicates = new MapMaker().makeMap(); - final private Predicate defaultFilter; @Inject public BatchServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper, - final Predicate defaultFilter + final ObjectMapper jsonMapper ) { super( @@ -65,11 +56,10 @@ public class BatchServerInventoryView extends ServerInventoryView>(){} + new TypeReference>() + { + } ); - - Preconditions.checkNotNull(defaultFilter); - this.defaultFilter = defaultFilter; } @Override @@ -79,12 +69,8 @@ public class BatchServerInventoryView extends ServerInventoryView inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory - Set filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate)); - - zNodes.put(inventoryKey, filteredInventory); - for (DataSegment segment : filteredInventory) { + zNodes.put(inventoryKey, inventory); + for (DataSegment segment : inventory) { addSingleInventory(container, segment); } return container; @@ -95,22 +81,18 @@ public class BatchServerInventoryView extends ServerInventoryView inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory - Set filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate)); - Set existing = zNodes.get(inventoryKey); if (existing == null) { throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); } - for (DataSegment segment : Sets.difference(filteredInventory, existing)) { + for (DataSegment segment : Sets.difference(inventory, existing)) { addSingleInventory(container, segment); } - for (DataSegment segment : Sets.difference(existing, filteredInventory)) { + for (DataSegment segment : Sets.difference(existing, inventory)) { removeSingleInventory(container, segment.getIdentifier()); } - zNodes.put(inventoryKey, filteredInventory); + zNodes.put(inventoryKey, inventory); return container; } @@ -131,56 +113,4 @@ public class BatchServerInventoryView extends ServerInventoryView filter - ) - { - segmentPredicates.put(callback, filter); - registerSegmentCallback( - exec, new SegmentCallback() - { - @Override - public CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentAdded(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentRemoved(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentViewInitialized() - { - return callback.segmentViewInitialized(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java index 3d08c753f21..9a1ef188f00 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java @@ -47,6 +47,6 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv @Override public BatchServerInventoryView get() { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysTrue()); + return new BatchServerInventoryView(zkPaths, curator, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java deleted file mode 100644 index 190a031667a..00000000000 --- a/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import javax.validation.constraints.NotNull; - -public class FilteredBatchServerViewProvider implements FilteredServerViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public BatchServerInventoryView get() - { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); - } -} diff --git a/server/src/main/java/io/druid/client/FilteredServerView.java b/server/src/main/java/io/druid/client/FilteredServerView.java deleted file mode 100644 index 38496db89a7..00000000000 --- a/server/src/main/java/io/druid/client/FilteredServerView.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - -import com.google.common.base.Predicate; -import io.druid.timeline.DataSegment; - -import java.util.concurrent.Executor; - -public interface FilteredServerView -{ - public void registerSegmentCallback( - Executor exec, ServerView.SegmentCallback callback, Predicate filter - ); -} diff --git a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredServerViewProvider.java deleted file mode 100644 index 06e4a0aeb77..00000000000 --- a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; - -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerViewProvider.class) -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class), - @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class) -}) -public interface FilteredServerViewProvider extends Provider -{ -} diff --git a/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java deleted file mode 100644 index c7f2902665c..00000000000 --- a/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import javax.validation.constraints.NotNull; - -public class FilteredSingleServerViewProvider implements FilteredServerViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public SingleServerInventoryView get() - { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); - } -} diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java index 1a63607f10f..76d170a34af 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java @@ -47,6 +47,6 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide @Override public ServerInventoryView get() { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysTrue()); + return new SingleServerInventoryView(zkPaths, curator, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 336a3087b04..3a51cf0dd96 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -21,37 +21,25 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.MapMaker; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; - /** */ @ManageLifecycle -public class SingleServerInventoryView extends ServerInventoryView implements FilteredServerView +public class SingleServerInventoryView extends ServerInventoryView { private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); - final private ConcurrentMap> segmentPredicates = new MapMaker().makeMap(); - private final Predicate defaultFilter; - @Inject public SingleServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper, - final Predicate defaultFilter + final ObjectMapper jsonMapper ) { super( @@ -60,11 +48,10 @@ public class SingleServerInventoryView extends ServerInventoryView zkPaths.getServedSegmentsPath(), curator, jsonMapper, - new TypeReference(){} + new TypeReference() + { + } ); - - Preconditions.checkNotNull(defaultFilter); - this.defaultFilter = defaultFilter; } @Override @@ -72,10 +59,7 @@ public class SingleServerInventoryView extends ServerInventoryView DruidServer container, String inventoryKey, DataSegment inventory ) { - Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); - if(predicate.apply(inventory)) { - addSingleInventory(container, inventory); - } + addSingleInventory(container, inventory); return container; } @@ -93,58 +77,4 @@ public class SingleServerInventoryView extends ServerInventoryView removeSingleInventory(container, inventoryKey); return container; } - - @Override - public void registerSegmentCallback( - final Executor exec, final SegmentCallback callback, final Predicate filter - ) - { - segmentPredicates.put(callback, filter); - registerSegmentCallback( - exec, new SegmentCallback() - { - @Override - public CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentAdded(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment segment - ) - { - { - final CallbackAction action; - if(filter.apply(segment)) { - action = callback.segmentRemoved(server, segment); - if (action.equals(CallbackAction.UNREGISTER)) { - segmentPredicates.remove(callback); - } - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - } - - @Override - public CallbackAction segmentViewInitialized() - { - return callback.segmentViewInitialized(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/guice/ServerViewModule.java b/server/src/main/java/io/druid/guice/ServerViewModule.java index 803fbfacec8..356122e65a8 100644 --- a/server/src/main/java/io/druid/guice/ServerViewModule.java +++ b/server/src/main/java/io/druid/guice/ServerViewModule.java @@ -21,8 +21,6 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; -import io.druid.client.FilteredServerView; -import io.druid.client.FilteredServerViewProvider; import io.druid.client.InventoryView; import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryViewProvider; @@ -36,10 +34,8 @@ public class ServerViewModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); - JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerViewProvider.class); binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); - binder.bind(FilteredServerView.class).toProvider(FilteredServerViewProvider.class).in(ManageLifecycle.class); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 3c9ab5e7026..fe2a94e5280 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.client.FilteredServerView; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index 876e4494cc6..571448e9d2d 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -319,8 +319,7 @@ public class BrokerServerViewTest extends CuratorTestBase baseView = new BatchServerInventoryView( zkPathsConfig, curator, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ) { @Override diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java index 13def515cf3..5b31d569fee 100644 --- a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -306,8 +306,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase baseView = new BatchServerInventoryView( zkPathsConfig, curator, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ) { @Override diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index ff58e2ee734..d6f0d5bc027 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -20,8 +20,6 @@ package io.druid.client.client; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -33,7 +31,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import io.druid.client.BatchServerInventoryView; import io.druid.client.DruidServer; -import io.druid.client.ServerView; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.announcement.Announcer; import io.druid.jackson.DefaultObjectMapper; @@ -47,9 +44,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.LogicalOperator; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -59,9 +53,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -159,8 +151,7 @@ public class BatchServerInventoryViewTest } }, cf, - jsonMapper, - Predicates.alwaysTrue() + jsonMapper ); batchServerInventoryView.start(); @@ -175,16 +166,9 @@ public class BatchServerInventoryViewTest } }, cf, - jsonMapper, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment dataSegment) - { - return dataSegment.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS)); - } - } - ){ + jsonMapper + ) + { @Override protected DruidServer addInnerInventory( DruidServer container, String inventoryKey, Set inventory @@ -243,122 +227,6 @@ public class BatchServerInventoryViewTest Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); } - @Test - public void testRunWithFilter() throws Exception - { - segmentAnnouncer.announceSegments(testSegments); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); - Set segments = Sets.newHashSet(server.getSegments().values()); - - Assert.assertEquals(testSegments, segments); - int prevUpdateCount = inventoryUpdateCounter.get(); - // segment outside the range of default filter - DataSegment segment1 = makeSegment(101); - segmentAnnouncer.announceSegment(segment1); - testSegments.add(segment1); - - waitForUpdateEvents(prevUpdateCount + 1); - Assert.assertNull( - Iterables.getOnlyElement(filteredBatchServerInventoryView.getInventory()) - .getSegment(segment1.getIdentifier()) - ); - } - - @Test - public void testRunWithFilterCallback() throws Exception - { - final CountDownLatch removeCallbackLatch = new CountDownLatch(1); - - segmentAnnouncer.announceSegments(testSegments); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); - Set segments = Sets.newHashSet(server.getSegments().values()); - - Assert.assertEquals(testSegments, segments); - - ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class); - Comparator dataSegmentComparator = new Comparator() - { - @Override - public int compare(DataSegment o1, DataSegment o2) - { - return o1.getInterval().equals(o2.getInterval()) ? 0 : -1; - } - }; - - EasyMock - .expect( - callback.segmentAdded( - EasyMock.anyObject(), - EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) - ) - ) - .andReturn(ServerView.CallbackAction.CONTINUE) - .times(1); - - EasyMock - .expect( - callback.segmentRemoved( - EasyMock.anyObject(), - EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) - ) - ) - .andAnswer( - new IAnswer() - { - @Override - public ServerView.CallbackAction answer() throws Throwable - { - removeCallbackLatch.countDown(); - return ServerView.CallbackAction.CONTINUE; - } - } - ) - .times(1); - - - EasyMock.replay(callback); - - filteredBatchServerInventoryView.registerSegmentCallback( - MoreExecutors.sameThreadExecutor(), - callback, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment dataSegment) - { - return dataSegment.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2)); - } - } - ); - - DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2); - segmentAnnouncer.announceSegment(segment2); - testSegments.add(segment2); - - DataSegment oldSegment = makeSegment(-1); - segmentAnnouncer.announceSegment(oldSegment); - testSegments.add(oldSegment); - - segmentAnnouncer.unannounceSegment(oldSegment); - testSegments.remove(oldSegment); - - waitForSync(filteredBatchServerInventoryView, testSegments); - - segmentAnnouncer.unannounceSegment(segment2); - testSegments.remove(segment2); - - waitForSync(filteredBatchServerInventoryView, testSegments); - timing.forWaiting().awaitLatch(removeCallbackLatch); - - EasyMock.verify(callback); - } - private DataSegment makeSegment(int offset) { return DataSegment.builder() @@ -395,7 +263,11 @@ public class BatchServerInventoryViewTest while (inventoryUpdateCounter.get() != count) { Thread.sleep(100); if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) { - throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get()); + throw new ISE( + "BatchServerInventoryView is not updating counter expected[%d] value[%d]", + count, + inventoryUpdateCounter.get() + ); } } } @@ -459,7 +331,7 @@ public class BatchServerInventoryViewTest List segments = new ArrayList(); try { for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) { - segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); + segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); } latch.countDown(); latch.await(); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 62b1e537e59..9d05fac366c 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -20,7 +20,6 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.google.common.base.Suppliers; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -28,8 +27,6 @@ import com.google.common.io.Files; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; import io.druid.client.cache.MapCache; import io.druid.data.input.Committer; import io.druid.data.input.InputRow;