From 951a4e9b35b61737a001827c5c1e759c524dc7d5 Mon Sep 17 00:00:00 2001 From: Bingkun Guo Date: Fri, 18 Dec 2015 15:19:01 -0600 Subject: [PATCH] Remove SingleDataSegmentAnnouncer in favor of BatchDataSegmentAnnouncer --- docs/content/configuration/index.md | 14 +-- .../java/io/druid/guice/AnnouncerModule.java | 2 - .../SingleDataSegmentAnnouncer.java | 89 ------------------- .../coordination/ZkCoordinatorTest.java | 69 +++++++------- 4 files changed, 37 insertions(+), 137 deletions(-) delete mode 100644 server/src/main/java/io/druid/server/coordination/SingleDataSegmentAnnouncer.java diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index e4bae5ee1c9..40ab438cbb5 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -269,19 +269,7 @@ This config is used to find the [Coordinator](../design/coordinator.html) using ### Announcing Segments -You can optionally configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs. - -#### Data Segment Announcer - -Data segment announcers are used to announce segments. - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.announcer.type`|Choices: legacy or batch. The type of data segment announcer to use.|batch| - -##### Single Data Segment Announcer - -In legacy Druid, each segment served by a node would be announced as an individual Znode. +You can configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs. ##### Batch Data Segment Announcer diff --git a/server/src/main/java/io/druid/guice/AnnouncerModule.java b/server/src/main/java/io/druid/guice/AnnouncerModule.java index a14366e5863..8d1152ea7ca 100644 --- a/server/src/main/java/io/druid/guice/AnnouncerModule.java +++ b/server/src/main/java/io/druid/guice/AnnouncerModule.java @@ -27,7 +27,6 @@ import io.druid.curator.announcement.Announcer; import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncerProvider; -import io.druid.server.coordination.SingleDataSegmentAnnouncer; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import org.apache.curator.framework.CuratorFramework; @@ -42,7 +41,6 @@ public class AnnouncerModule implements Module JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class); binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class); binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); - binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); } @Provides diff --git a/server/src/main/java/io/druid/server/coordination/SingleDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/SingleDataSegmentAnnouncer.java deleted file mode 100644 index 32471615eb2..00000000000 --- a/server/src/main/java/io/druid/server/coordination/SingleDataSegmentAnnouncer.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.coordination; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import com.metamx.common.logger.Logger; -import io.druid.curator.announcement.Announcer; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.utils.ZKPaths; - -import java.io.IOException; - -public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer -{ - private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class); - - private final Announcer announcer; - private final ObjectMapper jsonMapper; - private final String servedSegmentsLocation; - - @Inject - public SingleDataSegmentAnnouncer( - DruidServerMetadata server, - ZkPathsConfig config, - Announcer announcer, - ObjectMapper jsonMapper - ) - { - super(server, config, announcer, jsonMapper); - - this.announcer = announcer; - this.jsonMapper = jsonMapper; - this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName()); - } - - public void announceSegment(DataSegment segment) throws IOException - { - final String path = makeServedSegmentPath(segment); - log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path); - announcer.announce(path, jsonMapper.writeValueAsBytes(segment)); - } - - public void unannounceSegment(DataSegment segment) throws IOException - { - final String path = makeServedSegmentPath(segment); - log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path); - announcer.unannounce(path); - } - - @Override - public void announceSegments(Iterable segments) throws IOException - { - for (DataSegment segment : segments) { - announceSegment(segment); - } - } - - @Override - public void unannounceSegments(Iterable segments) throws IOException - { - for (DataSegment segment : segments) { - unannounceSegment(segment); - } - } - - private String makeServedSegmentPath(DataSegment segment) - { - return ZKPaths.makePath(servedSegmentsLocation, segment.getIdentifier()); - } -} diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 729dc49d66d..0233a29c1a6 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -19,35 +19,6 @@ package io.druid.server.coordination; -import io.druid.client.cache.CacheConfig; -import io.druid.client.cache.LocalCacheProvider; -import io.druid.concurrent.Execs; -import io.druid.curator.CuratorTestBase; -import io.druid.curator.announcement.Announcer; -import io.druid.jackson.DefaultObjectMapper; -import io.druid.query.NoopQueryRunnerFactoryConglomerate; -import io.druid.segment.IndexIO; -import io.druid.segment.loading.CacheTestSegmentLoader; -import io.druid.segment.loading.SegmentLoaderConfig; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.server.metrics.NoopServiceEmitter; -import io.druid.timeline.DataSegment; -import io.druid.timeline.partition.NoneShardSpec; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.curator.framework.CuratorFramework; -import org.joda.time.Interval; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -61,6 +32,34 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.LocalCacheProvider; +import io.druid.concurrent.Execs; +import io.druid.curator.CuratorTestBase; +import io.druid.curator.announcement.Announcer; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.NoopQueryRunnerFactoryConglomerate; +import io.druid.segment.IndexIO; +import io.druid.segment.loading.CacheTestSegmentLoader; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.server.metrics.NoopServiceEmitter; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.curator.framework.CuratorFramework; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -117,8 +116,12 @@ public class ZkCoordinatorTest extends CuratorTestBase announceCount = new AtomicInteger(0); announcer = new DataSegmentAnnouncer() { - private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer( - me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper + private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer( + me, + new BatchDataSegmentAnnouncerConfig(), + zkPaths, + new Announcer(curator, Execs.singleThreaded("blah")), + jsonMapper ); @Override @@ -191,7 +194,7 @@ public class ZkCoordinatorTest extends CuratorTestBase public void testLoadCache() throws Exception { List segments = Lists.newLinkedList(); - for(int i = 0; i < COUNT; ++i) { + for (int i = 0; i < COUNT; ++i) { segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01"))); segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02"))); segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02"))); @@ -216,7 +219,7 @@ public class ZkCoordinatorTest extends CuratorTestBase Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); zkCoordinator.start(); Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); - for(int i = 0; i < COUNT; ++i) { + for (int i = 0; i < COUNT; ++i) { Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue()); Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); }