From caa68e101ae4f41de0a65f7987bb2ad44030bdfe Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 19 Jun 2013 15:56:45 -0700 Subject: [PATCH 1/2] first commit; things working right now --- .../java/com/metamx/druid/QueryableNode.java | 50 ++++-- .../AbstractDataSegmentAnnouncer.java | 100 +++++++++++ .../BatchingCuratorDataSegmentAnnouncer.java | 155 ++++++++++++++++++ .../CuratorDataSegmentAnnouncer.java | 68 ++------ .../coordination/DataSegmentAnnouncer.java | 24 +++ .../coordination/DruidServerMetadata.java | 2 +- ...aSegmentAnnouncerDataSegmentAnnouncer.java | 80 +++++++++ .../metamx/druid/curator/SegmentReader.java | 46 ++++++ .../druid/curator/announcement/Announcer.java | 53 +++++- .../druid/initialization/ZkPathsConfig.java | 15 ++ .../examples/RealtimeStandaloneMain.java | 12 ++ .../common/task/RealtimeIndexTask.java | 24 ++- .../coordinator/TaskMasterLifecycle.java | 4 +- .../http/IndexerCoordinatorNode.java | 106 ++++++------ .../NoopResourceManagementScheduler.java | 52 ++++++ .../coordinator/RemoteTaskRunnerTest.java | 12 ++ .../druid/coordination/ZkCoordinator.java | 84 +++++++++- .../coordination/ZkCoordinatorConfig.java | 6 + .../druid/loading/SingleSegmentLoader.java | 4 +- .../com/metamx/druid/utils/DruidSetup.java | 12 ++ .../druid/coordination/ZkCoordinatorTest.java | 20 ++- .../metamx/druid/master/DruidMasterTest.java | 12 ++ 22 files changed, 810 insertions(+), 131 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java create mode 100644 client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java create mode 100644 client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java create mode 100644 client/src/main/java/com/metamx/druid/curator/SegmentReader.java create mode 100644 indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopResourceManagementScheduler.java diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 68f978929d6..123df181f8d 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -39,9 +39,12 @@ import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerView; import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; +import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; +import com.metamx.druid.curator.SegmentReader; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.RequestLogger; import com.metamx.druid.initialization.CuratorConfig; @@ -368,18 +371,21 @@ public abstract class QueryableNode extends Registering if (requestLogger == null) { try { final String loggingType = props.getProperty("druid.request.logging.type"); - if("emitter".equals(loggingType)) { - setRequestLogger(Initialization.makeEmittingRequestLogger( - getProps(), - getEmitter() - )); - } - else { - setRequestLogger(Initialization.makeFileRequestLogger( - getJsonMapper(), - getScheduledExecutorFactory(), - getProps() - )); + if ("emitter".equals(loggingType)) { + setRequestLogger( + Initialization.makeEmittingRequestLogger( + getProps(), + getEmitter() + ) + ); + } else { + setRequestLogger( + Initialization.makeFileRequestLogger( + getJsonMapper(), + getScheduledExecutorFactory(), + getProps() + ) + ); } } catch (IOException e) { @@ -421,7 +427,25 @@ public abstract class QueryableNode extends Registering final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")); lifecycle.addManagedInstance(announcer); - setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())); + setAnnouncer( + new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( + getDruidServerMetadata(), + getZkPaths(), + announcer, + getJsonMapper(), + Arrays.asList( + new BatchingCuratorDataSegmentAnnouncer( + getDruidServerMetadata(), + getZkPaths(), + announcer, + getJsonMapper(), + new SegmentReader(curator, getJsonMapper()) + ), + new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) + ) + ) + ); + lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST); } } diff --git a/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java new file mode 100644 index 00000000000..583910e3dac --- /dev/null +++ b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java @@ -0,0 +1,100 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkPathsConfig; +import org.apache.curator.utils.ZKPaths; + +/** + */ +public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer +{ + private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class); + + private final DruidServerMetadata server; + private final ZkPathsConfig config; + private final Announcer announcer; + private final ObjectMapper jsonMapper; + + private final Object lock = new Object(); + + private volatile boolean started = false; + + protected AbstractDataSegmentAnnouncer( + DruidServerMetadata server, + ZkPathsConfig config, + Announcer announcer, + ObjectMapper jsonMapper + ) + { + this.server = server; + this.config = config; + this.announcer = announcer; + this.jsonMapper = jsonMapper; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + try { + final String path = makeAnnouncementPath(); + log.info("Announcing self[%s] at [%s]", server, path); + announcer.announce(path, jsonMapper.writeValueAsBytes(server)); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); + announcer.unannounce(makeAnnouncementPath()); + + started = false; + } + } + + private String makeAnnouncementPath() + { + return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName()); + } +} diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java new file mode 100644 index 00000000000..68d9e2d11a6 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java @@ -0,0 +1,155 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.curator.SegmentReader; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkPathsConfig; +import org.apache.curator.utils.ZKPaths; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +{ + private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class); + + private final ZkPathsConfig config; + private final Announcer announcer; + private final ObjectMapper jsonMapper; + private final SegmentReader segmentReader; + private final String liveSegmentLocation; + + private final Map zNodes = Maps.newHashMap(); + private final Map segmentLookup = new ConcurrentHashMap(); + + public BatchingCuratorDataSegmentAnnouncer( + DruidServerMetadata server, + ZkPathsConfig config, + Announcer announcer, + ObjectMapper jsonMapper, + SegmentReader segmentReader + ) + { + super(server, config, announcer, jsonMapper); + + this.config = config; + this.announcer = announcer; + this.jsonMapper = jsonMapper; + this.segmentReader = segmentReader; + this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName()); + } + + @Override + public void announceSegment(DataSegment segment) throws IOException + { + Map.Entry zNode = (zNodes.entrySet().isEmpty()) ? null : zNodes.entrySet().iterator().next(); + + final String path = (zNode == null) ? makeServedSegmentPath(new DateTime().toString()) : zNode.getKey(); + + Set zkSegments = segmentReader.read(path); + zkSegments.add(segment); + if (zkSegments.size() >= config.getSegmentsPerNode()) { + zNodes.remove(path); + } else { + zNodes.put(path, zkSegments.size()); + } + segmentLookup.put(segment.getIdentifier(), path); + + log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path); + + byte[] bytes = jsonMapper.writeValueAsBytes(zkSegments); + if (bytes.length > config.getMaxNumBytes()) { + throw new ISE("byte size %,d exceeds %,d", bytes.length, config.getMaxNumBytes()); + } + + announcer.update(path, bytes); + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + final String path = segmentLookup.get(segment.getIdentifier()); + + Set zkSegments = segmentReader.read(path); + zkSegments.remove(segment); + + log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path); + if (zkSegments.isEmpty()) { + announcer.unannounce(path); + } else if (zkSegments.size() < config.getSegmentsPerNode()) { + zNodes.put(path, zkSegments.size()); + announcer.update(path, jsonMapper.writeValueAsBytes(zkSegments)); + } + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + Iterable> batched = Iterables.partition(segments, config.getSegmentsPerNode()); + + for (List batch : batched) { + final String path = makeServedSegmentPath(new DateTime().toString()); + for (DataSegment segment : batch) { + log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path); + segmentLookup.put(segment.getIdentifier(), path); + } + if (batch.size() < config.getSegmentsPerNode()) { + zNodes.put(path, batch.size()); + } + + byte[] bytes = jsonMapper.writeValueAsBytes(batch); + if (bytes.length > config.getMaxNumBytes()) { + throw new ISE("byte size %,d exceeds %,d", bytes.length, config.getMaxNumBytes()); + } + + announcer.update(path, bytes); + } + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + unannounceSegment(segment); + } + } + + private String makeServedSegmentPath(String zNode) + { + return ZKPaths.makePath(liveSegmentLocation, zNode); + } +} diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java index a7efc6cb9e8..f9286d1a239 100644 --- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java @@ -19,11 +19,7 @@ package com.metamx.druid.coordination; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.announcement.Announcer; @@ -32,20 +28,14 @@ import org.apache.curator.utils.ZKPaths; import java.io.IOException; -public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer +public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer { private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); - private final Object lock = new Object(); - - private final DruidServerMetadata server; - private final ZkPathsConfig config; private final Announcer announcer; private final ObjectMapper jsonMapper; private final String servedSegmentsLocation; - private volatile boolean started = false; - public CuratorDataSegmentAnnouncer( DruidServerMetadata server, ZkPathsConfig config, @@ -53,49 +43,13 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer ObjectMapper jsonMapper ) { - this.server = server; - this.config = config; + super(server, config, announcer, jsonMapper); + this.announcer = announcer; this.jsonMapper = jsonMapper; this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName()); } - @LifecycleStart - public void start() - { - synchronized (lock) { - if (started) { - return; - } - - try { - final String path = makeAnnouncementPath(); - log.info("Announcing self[%s] at [%s]", server, path); - announcer.announce(path, jsonMapper.writeValueAsBytes(server)); - } - catch (JsonProcessingException e) { - throw Throwables.propagate(e); - } - - started = true; - } - } - - @LifecycleStop - public void stop() - { - synchronized (lock) { - if (!started) { - return; - } - - log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); - announcer.unannounce(makeAnnouncementPath()); - - started = false; - } - } - public void announceSegment(DataSegment segment) throws IOException { final String path = makeServedSegmentPath(segment); @@ -110,8 +64,20 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer announcer.unannounce(path); } - private String makeAnnouncementPath() { - return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName()); + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + announceSegment(segment); + } + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + unannounceSegment(segment); + } } private String makeServedSegmentPath(DataSegment segment) diff --git a/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java index 699c4b1e8ce..71eaaa37276 100644 --- a/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.coordination; import com.metamx.druid.client.DataSegment; @@ -7,5 +26,10 @@ import java.io.IOException; public interface DataSegmentAnnouncer { public void announceSegment(DataSegment segment) throws IOException; + public void unannounceSegment(DataSegment segment) throws IOException; + + public void announceSegments(Iterable segments) throws IOException; + + public void unannounceSegments(Iterable segments) throws IOException; } diff --git a/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java index 25c9c9875e8..232df085e07 100644 --- a/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java +++ b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java @@ -80,7 +80,7 @@ public class DruidServerMetadata @Override public String toString() { - return "DruidServer{" + + return "DruidServerMetadata{" + "name='" + name + '\'' + ", host='" + host + '\'' + ", maxSize=" + maxSize + diff --git a/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java new file mode 100644 index 00000000000..d4b3678a917 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java @@ -0,0 +1,80 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkPathsConfig; + +import java.io.IOException; + +/** + * This class has the greatest name ever + */ +public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +{ + private final Iterable dataSegmentAnnouncers; + + public MultipleDataSegmentAnnouncerDataSegmentAnnouncer( + DruidServerMetadata server, + ZkPathsConfig config, + Announcer announcer, + ObjectMapper jsonMapper, + Iterable dataSegmentAnnouncers + ) + { + super(server, config, announcer, jsonMapper); + + this.dataSegmentAnnouncers = dataSegmentAnnouncers; + } + + @Override + public void announceSegment(DataSegment segment) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.announceSegment(segment); + } + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.unannounceSegment(segment); + } + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.announceSegments(segments); + } + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.unannounceSegments(segments); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/SegmentReader.java b/client/src/main/java/com/metamx/druid/curator/SegmentReader.java new file mode 100644 index 00000000000..43674a88d9c --- /dev/null +++ b/client/src/main/java/com/metamx/druid/curator/SegmentReader.java @@ -0,0 +1,46 @@ +package com.metamx.druid.curator; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import org.apache.curator.framework.CuratorFramework; + +import java.util.Set; + +/** + */ +public class SegmentReader +{ + private static final Logger log = new Logger(SegmentReader.class); + + private final CuratorFramework cf; + private final ObjectMapper jsonMapper; + + public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper) + { + this.cf = cf; + this.jsonMapper = jsonMapper; + } + + public Set read(String path) + { + try { + if (cf.checkExists().forPath(path) != null) { + return jsonMapper.readValue( + cf.getData().forPath(path), new TypeReference>() + { + } + ); + } + } + catch (Exception e) { + log.error("Unable to read any segment ids from %s", path); + throw Throwables.propagate(e); + } + + return Sets.newHashSet(); + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index f7c074938c7..b586ccfa46a 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.curator.announcement; import com.google.common.base.Throwables; @@ -21,6 +40,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.Map; @@ -99,7 +119,7 @@ public class Announcer * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node * and monitor it to make sure that it always exists until it is unannounced or this object is closed. * - * @param path The path to announce at + * @param path The path to announce at * @param bytes The payload to announce */ public void announce(String path, byte[] bytes) @@ -127,7 +147,7 @@ public class Announcer // Synchronize to make sure that I only create a listener once. synchronized (finalSubPaths) { - if (! listeners.containsKey(parentPath)) { + if (!listeners.containsKey(parentPath)) { final PathChildrenCache cache = factory.make(curator, parentPath); cache.getListenable().addListener( new PathChildrenCacheListener() @@ -226,15 +246,42 @@ public class Announcer } } + public void update(final String path, final byte[] bytes) + { + final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + + final String parentPath = pathAndNode.getPath(); + final String nodePath = pathAndNode.getNode(); + + ConcurrentMap subPaths = announcements.get(parentPath); + + if (subPaths == null || subPaths.get(nodePath) == null) { + announce(path, bytes); + return; + } + + try { + updateAnnouncement(path, bytes); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + private String createAnnouncement(final String path, byte[] value) throws Exception { return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); } + private Stat updateAnnouncement(final String path, final byte[] value) throws Exception + { + return curator.setData().compressed().inBackground().forPath(path, value); + } + /** * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. - * + *

* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. * * @param path diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java index 2f04e61b309..c36f82e086b 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java @@ -21,12 +21,21 @@ package com.metamx.druid.initialization; import org.apache.curator.utils.ZKPaths; import org.skife.config.Config; +import org.skife.config.Default; public abstract class ZkPathsConfig { @Config("druid.zk.paths.base") public abstract String getZkBasePath(); + @Config("druid.zk.segmentsPerNode") + @Default("50") + public abstract int getSegmentsPerNode(); + + @Config("druid.zk.maxNumBytesPerNode") + @Default("512000") + public abstract long getMaxNumBytes(); + @Config("druid.zk.paths.propertiesPath") public String getPropertiesPath() { @@ -45,6 +54,12 @@ public abstract class ZkPathsConfig return defaultPath("servedSegments"); } + @Config("druid.zk.paths.liveSegmentsPath") + public String getLiveSegmentsPath() + { + return defaultPath("segments"); + } + @Config("druid.zk.paths.loadQueuePath") public String getLoadQueuePath() { diff --git a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java index 4cb267f571f..b71ad732b66 100644 --- a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -142,5 +142,17 @@ public class RealtimeStandaloneMain { // do nothing } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + // do nothing + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + // do nothing + } } } \ No newline at end of file diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index d6f775a2611..d5dd40472f4 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -30,7 +30,6 @@ import com.metamx.druid.Query; import com.metamx.druid.client.DataSegment; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.index.v1.IndexGranularity; -import com.metamx.druid.input.InputRow; import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolbox; @@ -38,6 +37,7 @@ import com.metamx.druid.indexing.common.actions.LockAcquireAction; import com.metamx.druid.indexing.common.actions.LockListAction; import com.metamx.druid.indexing.common.actions.LockReleaseAction; import com.metamx.druid.indexing.common.actions.SegmentInsertAction; +import com.metamx.druid.input.InputRow; import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerFactory; @@ -226,6 +226,28 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval())); } } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); + } + toolbox.getSegmentAnnouncer().announceSegments(segments); + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + try { + toolbox.getSegmentAnnouncer().unannounceSegments(segments); + } + finally { + for (DataSegment segment : segments) { + toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval())); + } + } + } }; // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java index d83ecf2f747..8b4e3fba6e1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java @@ -106,9 +106,7 @@ public class TaskMasterLifecycle Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); leaderLifecycle.addManagedInstance(taskConsumer); - if (indexerCoordinatorConfig.isAutoScalingEnabled()) { - leaderLifecycle.addManagedInstance(resourceManagementScheduler); - } + leaderLifecycle.addManagedInstance(resourceManagementScheduler); try { leaderLifecycle.start(); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index 69a9d4dac88..b07ae5f1792 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -55,10 +55,6 @@ import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.RedirectFilter; import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.StatusServlet; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; @@ -93,12 +89,17 @@ import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy; import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy; +import com.metamx.druid.indexing.coordinator.scaling.NoopResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ServerConfig; +import com.metamx.druid.initialization.ServiceDiscoveryConfig; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -687,53 +688,64 @@ public class IndexerCoordinatorNode extends QueryableNode workerSetupData = configManager.watch( - WorkerSetupData.CONFIG_KEY, WorkerSetupData.class - ); - - AutoScalingStrategy strategy; - if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) { - strategy = new EC2AutoScalingStrategy( - getJsonMapper(), - new AmazonEC2Client( - new BasicAWSCredentials( - PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"), - PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey") - ) - ), - getConfigFactory().build(EC2AutoScalingStrategyConfig.class), - workerSetupData - ); - } else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) { - strategy = new NoopAutoScalingStrategy(); - } else { - throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl()); + @Override + public ResourceManagementScheduler build(TaskRunner runner) + { + return new NoopResourceManagementScheduler(); } + }; + } else { + resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory() + { + @Override + public ResourceManagementScheduler build(TaskRunner runner) + { + final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ScalingExec--%d") + .build() + ); + final AtomicReference workerSetupData = configManager.watch( + WorkerSetupData.CONFIG_KEY, WorkerSetupData.class + ); - return new ResourceManagementScheduler( - runner, - new SimpleResourceManagementStrategy( - strategy, - getConfigFactory().build(SimpleResourceManagmentConfig.class), + AutoScalingStrategy strategy; + if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) { + strategy = new EC2AutoScalingStrategy( + getJsonMapper(), + new AmazonEC2Client( + new BasicAWSCredentials( + PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"), + PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey") + ) + ), + getConfigFactory().build(EC2AutoScalingStrategyConfig.class), workerSetupData - ), - getConfigFactory().build(ResourceManagementSchedulerConfig.class), - scalingScheduledExec - ); - } - }; + ); + } else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) { + strategy = new NoopAutoScalingStrategy(); + } else { + throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl()); + } + + return new ResourceManagementScheduler( + runner, + new SimpleResourceManagementStrategy( + strategy, + getConfigFactory().build(SimpleResourceManagmentConfig.class), + workerSetupData + ), + getConfigFactory().build(ResourceManagementSchedulerConfig.class), + scalingScheduledExec + ); + } + }; + } } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopResourceManagementScheduler.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopResourceManagementScheduler.java new file mode 100644 index 00000000000..a4f9c274eec --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopResourceManagementScheduler.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator.scaling; + +import com.metamx.common.logger.Logger; + +/** + */ +public class NoopResourceManagementScheduler extends ResourceManagementScheduler +{ + private static final Logger log = new Logger(NoopResourceManagementScheduler.class); + + public NoopResourceManagementScheduler() + { + super(null, null, null, null); + } + + @Override + public void start() + { + log.info("Autoscaling is disabled."); + } + + @Override + public void stop() + { + // do nothing + } + + @Override + public ScalingStats getStats() + { + return new ScalingStats(0); + } +} diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 0a11fcb49fb..2a9fca0589b 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -283,6 +283,12 @@ public class RemoteTaskRunnerTest jsonMapper, new IndexerZkConfig() { + @Override + public int getSegmentsPerNode() + { + return 1; + } + @Override public String getIndexerAnnouncementPath() { @@ -448,5 +454,11 @@ public class RemoteTaskRunnerTest { return 1000; } + + @Override + public int getSegmentsPerNode() + { + return 1; + } } } diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 1befc1df888..f2d95375819 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -21,6 +21,7 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -37,6 +38,7 @@ import org.apache.curator.utils.ZKPaths; import java.io.File; import java.io.IOException; +import java.util.List; /** */ @@ -103,7 +105,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); - loadCache(); + if (config.isLoadFromSegmentCacheEnabled()) { + loadCache(); + } loadQueueCache.getListenable().addListener( new PathChildrenCacheListener() @@ -136,9 +140,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler } log.makeAlert(e, "Segment load/unload: uncaught exception.") - .addData("node", path) - .addData("nodeProperties", segment) - .emit(); + .addData("node", path) + .addData("nodeProperties", segment) + .emit(); } break; @@ -192,12 +196,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler return; } + List cachedSegments = Lists.newArrayList(); for (File file : baseDir.listFiles()) { log.info("Loading segment cache file [%s]", file); try { DataSegment segment = jsonMapper.readValue(file, DataSegment.class); if (serverManager.isSegmentCached(segment)) { - addSegment(segment); + cachedSegments.add(segment); + //addSegment(segment); } else { log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); @@ -213,6 +219,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler .emit(); } } + + addSegments(cachedSegments); } @Override @@ -239,14 +247,50 @@ public class ZkCoordinator implements DataSegmentChangeHandler removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } + } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") - .addData("segment", segment) - .emit(); + .addData("segment", segment) + .emit(); } } + public void addSegments(Iterable segments) + { + try { + for (DataSegment segment : segments) { + + serverManager.loadSegment(segment); + + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + + try { + announcer.announceSegments(segments); + } + catch (IOException e) { + removeSegments(segments); + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments); + } + } + catch (SegmentLoadingException e) { + log.makeAlert(e, "Failed to load segments for dataSource") + .addData("segments", segments) + .emit(); + } + } + + @Override public void removeSegment(DataSegment segment) { @@ -262,8 +306,30 @@ public class ZkCoordinator implements DataSegmentChangeHandler } catch (Exception e) { log.makeAlert("Failed to remove segment") - .addData("segment", segment) - .emit(); + .addData("segment", segment) + .emit(); + } + } + + public void removeSegments(Iterable segments) + { + try { + for (DataSegment segment : segments) { + serverManager.dropSegment(segment); + + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + + } + + announcer.unannounceSegments(segments); + } + catch (Exception e) { + log.makeAlert("Failed to remove segments") + .addData("segments", segments) + .emit(); } } } diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java index 262f92e0c87..12091968428 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java @@ -29,4 +29,10 @@ public abstract class ZkCoordinatorConfig { @Config("druid.paths.segmentInfoCache") public abstract File getSegmentInfoCacheDirectory(); + + @Config("druid.start.segmentCache.enable") + public boolean isLoadFromSegmentCacheEnabled() + { + return true; + } } diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index 61e9986f484..b184cf97395 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -29,7 +29,9 @@ import com.metamx.druid.index.QueryableIndexSegment; import com.metamx.druid.index.Segment; import org.apache.commons.io.FileUtils; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; /** */ diff --git a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java index 88f26351991..ac7922506ac 100644 --- a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java +++ b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java @@ -169,6 +169,18 @@ public class DruidSetup { return zPathBase; } + + @Override + public int getSegmentsPerNode() + { + return 50; + } + + @Override + public long getMaxNumBytes() + { + return 1000; + } }; try { diff --git a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java index 1a34c75cdbc..3f0ef6b03c6 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java @@ -27,6 +27,7 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.concurrent.Execs; import com.metamx.druid.curator.CuratorTestBase; +import com.metamx.druid.curator.SegmentReader; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.initialization.ZkPathsConfig; @@ -91,11 +92,26 @@ public class ZkCoordinatorTest extends CuratorTestBase { return "/druid"; } + + @Override + public int getSegmentsPerNode() + { + return 1; + } + + @Override + public long getMaxNumBytes() + { + return 1000; + } }; - announcer = new CuratorDataSegmentAnnouncer( - me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper + announcer = new BatchingCuratorDataSegmentAnnouncer( + me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper, new SegmentReader(curator, jsonMapper) ); + //announcer = new CuratorDataSegmentAnnouncer( + // me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper + //); zkCoordinator = new ZkCoordinator( jsonMapper, diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index a497811e066..dafff35094f 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -130,6 +130,18 @@ public class DruidMasterTest { return ""; } + + @Override + public int getSegmentsPerNode() + { + return 1; + } + + @Override + public long getMaxNumBytes() + { + return 1000; + } }, null, databaseSegmentManager, From 5720b45b0a9aa89f128dee39e703de0353646951 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 20 Jun 2013 16:21:08 -0700 Subject: [PATCH 2/2] tests --- .../BatchingCuratorDataSegmentAnnouncer.java | 5 +- ...tchingCuratorDataSegmentAnnouncerTest.java | 203 ++++++++++++++++++ 2 files changed, 204 insertions(+), 4 deletions(-) create mode 100644 client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java index 68d9e2d11a6..06fea9edf27 100644 --- a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java @@ -19,14 +19,10 @@ package com.metamx.druid.coordination; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.metamx.common.ISE; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.SegmentReader; @@ -108,6 +104,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno zkSegments.remove(segment); log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path); + if (zkSegments.isEmpty()) { announcer.unannounce(path); } else if (zkSegments.size() < config.getSegmentsPerNode()) { diff --git a/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java b/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java new file mode 100644 index 00000000000..e1645c7444f --- /dev/null +++ b/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java @@ -0,0 +1,203 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; +import com.metamx.druid.curator.SegmentReader; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkPathsConfig; +import com.metamx.druid.jackson.DefaultObjectMapper; +import junit.framework.Assert; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + */ +public class BatchingCuratorDataSegmentAnnouncerTest +{ + private static final String testBasePath = "/test"; + private static final String testSegmentsPath = "/test/segments/id"; + private static final Joiner joiner = Joiner.on("/"); + + private TestingCluster testingCluster; + private CuratorFramework cf; + private ObjectMapper jsonMapper; + private Announcer announcer; + private SegmentReader segmentReader; + private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer; + private Set testSegments; + + @Before + public void setUp() throws Exception + { + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) + .build(); + cf.start(); + cf.create().creatingParentsIfNeeded().forPath(testBasePath); + + jsonMapper = new DefaultObjectMapper(); + + announcer = new Announcer( + cf, + MoreExecutors.sameThreadExecutor() + ); + announcer.start(); + + segmentReader = new SegmentReader(cf, jsonMapper); + segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( + new DruidServerMetadata( + "id", + "host", + Long.MAX_VALUE, + "type", + "tier" + ), + new ZkPathsConfig() + { + @Override + public String getZkBasePath() + { + return testBasePath; + } + + @Override + public int getSegmentsPerNode() + { + return 50; + } + + @Override + public long getMaxNumBytes() + { + return 100000; + } + }, + announcer, + jsonMapper, + segmentReader + ); + segmentAnnouncer.start(); + + testSegments = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + testSegments.add(makeSegment(i)); + } + } + + @After + public void tearDown() throws Exception + { + segmentAnnouncer.stop(); + announcer.stop(); + cf.close(); + testingCluster.stop(); + } + + @Test + public void testSingleAnnounce() throws Exception + { + Iterator segIter = testSegments.iterator(); + DataSegment firstSegment = segIter.next(); + DataSegment secondSegment = segIter.next(); + + segmentAnnouncer.announceSegment(firstSegment); + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + + for (String zNode : zNodes) { + Set segments = segmentReader.read(joiner.join(testSegmentsPath, zNode)); + Assert.assertEquals(segments.iterator().next(), firstSegment); + } + + segmentAnnouncer.announceSegment(secondSegment); + + for (String zNode : zNodes) { + Set segments = segmentReader.read(joiner.join(testSegmentsPath, zNode)); + Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments); + } + + segmentAnnouncer.unannounceSegment(firstSegment); + + for (String zNode : zNodes) { + Set segments = segmentReader.read(joiner.join(testSegmentsPath, zNode)); + Assert.assertEquals(segments.iterator().next(), secondSegment); + } + + segmentAnnouncer.unannounceSegment(secondSegment); + + Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + } + + @Test + public void testBatchAnnounce() throws Exception + { + segmentAnnouncer.announceSegments(testSegments); + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + + Assert.assertTrue(zNodes.size() == 2); + + Set allSegments = Sets.newHashSet(); + for (String zNode : zNodes) { + allSegments.addAll(segmentReader.read(joiner.join(testSegmentsPath, zNode))); + } + Assert.assertEquals(allSegments, testSegments); + + segmentAnnouncer.unannounceSegments(testSegments); + + Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + } + + private DataSegment makeSegment(int offset) + { + return DataSegment.builder() + .dataSource("foo") + .interval( + new Interval( + new DateTime("2013-01-01").plusDays(offset), + new DateTime("2013-01-02").plusDays(offset) + ) + ) + .version(new DateTime().toString()) + .build(); + } +}