From 9c145f5ce3483c68e24fa7ae92bc28a219c4c1c1 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 19 Aug 2013 15:00:10 -0700 Subject: [PATCH] basic unit tets for realtime functinality --- .../druid/curator/announcement/Announcer.java | 9 +- .../druid/partition/PartitionHolder.java | 33 ++- .../druid/realtime/DbSegmentPublisher.java | 19 ++ .../realtime/DbSegmentPublisherConfig.java | 19 ++ .../druid/realtime/RealtimeManager.java | 11 +- .../druid/realtime/SegmentPublisher.java | 19 ++ .../IntervalStartVersioningPolicy.java | 19 ++ .../MessageTimeRejectionPolicyFactory.java | 19 ++ .../plumber/NoopRejectionPolicyFactory.java | 19 ++ .../realtime/plumber/RejectionPolicy.java | 19 ++ .../ServerTimeRejectionPolicyFactory.java | 19 ++ .../metamx/druid/realtime/plumber/Sink.java | 5 + .../realtime/plumber/VersioningPolicy.java | 19 ++ .../druid/realtime/RealtimeManagerTest.java | 252 ++++++++++++++++++ .../IntervalStartVersioningPolicyTest.java | 37 +++ ...MessageTimeRejectionPolicyFactoryTest.java | 40 +++ .../plumber/RealtimePlumberSchoolTest.java | 162 +++++++++++ .../ServerTimeRejectionPolicyFactoryTest.java | 41 +++ .../druid/realtime/plumber/SinkTest.java | 124 +++++++++ 19 files changed, 865 insertions(+), 20 deletions(-) create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index 727f7704771..abb96b76f68 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -320,12 +320,9 @@ public class Announcer final ConcurrentMap subPaths = announcements.get(parentPath); - if (subPaths == null) { - throw new IAE("Path[%s] not announced, cannot unannounce.", path); - } - - if (subPaths.remove(pathAndNode.getNode()) == null) { - throw new IAE("Path[%s] not announced, cannot unannounce.", path); + if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) { + log.error("Path[%s] not announced, cannot unannounce.", path); + return; } try { diff --git a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java index 79d1b5dd96e..be40ce9a63e 100644 --- a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java +++ b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import java.util.Iterator; import java.util.List; +import java.util.SortedSet; import java.util.TreeSet; /** @@ -68,11 +69,16 @@ public class PartitionHolder implements Iterable> public PartitionChunk remove(PartitionChunk chunk) { - // Somewhat funky implementation in order to return the removed object as it exists in the set - PartitionChunk element = holderSet.tailSet(chunk, true).first(); - if (chunk.equals(element)) { - holderSet.remove(element); - return element; + if (!holderSet.isEmpty()) { + // Somewhat funky implementation in order to return the removed object as it exists in the set + SortedSet> tailSet = holderSet.tailSet(chunk, true); + if (!tailSet.isEmpty()) { + PartitionChunk element = tailSet.first(); + if (chunk.equals(element)) { + holderSet.remove(element); + return element; + } + } } return null; } @@ -110,16 +116,17 @@ public class PartitionHolder implements Iterable> return true; } - public PartitionChunk getChunk(final int partitionNum) { + public PartitionChunk getChunk(final int partitionNum) + { final Iterator> retVal = Iterators.filter( holderSet.iterator(), new Predicate>() - { - @Override - public boolean apply(PartitionChunk input) - { - return input.getChunkNumber() == partitionNum; - } - } + { + @Override + public boolean apply(PartitionChunk input) + { + return input.getChunkNumber() == partitionNum; + } + } ); return retVal.hasNext() ? retVal.next() : null; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java index c60723ece50..5af94da2fb3 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java index 5dcaccac49b..f174fd0a85f 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime; import org.skife.config.Config; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java index 55c8af27dcd..dc7bdc7960e 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java @@ -94,6 +94,10 @@ public class RealtimeManager implements QuerySegmentWalker Closeables.closeQuietly(chief); } } + public FireDepartmentMetrics getMetrics(String datasource) + { + return chiefs.get(datasource).getMetrics(); + } @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) @@ -149,6 +153,11 @@ public class RealtimeManager implements QuerySegmentWalker } } + public FireDepartmentMetrics getMetrics() + { + return metrics; + } + @Override public void run() { @@ -186,11 +195,11 @@ public class RealtimeManager implements QuerySegmentWalker } int currCount = sink.add(inputRow); - metrics.incrementProcessed(); if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } + metrics.incrementProcessed(); } catch (FormattedException e) { log.info(e, "unparseable line: %s", e.getDetails()); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java index 48315849921..a381e5ab6a3 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime; import com.metamx.druid.client.DataSegment; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java index 4ad3f123299..c052deeec98 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.plumber; import org.joda.time.Interval; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java index 117fa6a40eb..57ba07a76cc 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.plumber; import org.joda.time.DateTime; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java index 59a3e24cb21..27bef8b8020 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.plumber; import org.joda.time.DateTime; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java index 847c917dc35..61f4308a15a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.plumber; import org.joda.time.DateTime; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java index 3557a8ba3bc..b97700699d4 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.plumber; import org.joda.time.DateTime; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java index a1823b6c09a..bc0bc194f99 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java @@ -100,6 +100,11 @@ public class Sink implements Iterable return interval; } + public FireHydrant getCurrIndex() + { + return currIndex; + } + public int add(InputRow row) { if (currIndex == null) { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java index 5fe790dd284..36ab3830f6e 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.plumber; import com.fasterxml.jackson.annotation.JsonSubTypes; diff --git a/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java b/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java new file mode 100644 index 00000000000..1203a0e6ba4 --- /dev/null +++ b/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java @@ -0,0 +1,252 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.druid.Query; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; +import com.metamx.druid.guava.Runnables; +import com.metamx.druid.index.v1.SpatialDimensionSchema; +import com.metamx.druid.input.InputRow; +import com.metamx.druid.query.QueryRunner; +import com.metamx.druid.realtime.firehose.Firehose; +import com.metamx.druid.realtime.firehose.FirehoseFactory; +import com.metamx.druid.realtime.plumber.Plumber; +import com.metamx.druid.realtime.plumber.PlumberSchool; +import com.metamx.druid.realtime.plumber.Sink; +import com.metamx.druid.shard.NoneShardSpec; +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + */ +public class RealtimeManagerTest +{ + private RealtimeManager realtimeManager; + private Schema schema; + private TestPlumber plumber; + + @Before + public void setUp() throws Exception + { + schema = new Schema( + "test", + Lists.newArrayList(), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + QueryGranularity.NONE, + new NoneShardSpec() + ); + + final List rows = Arrays.asList( + makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis()) + ); + + plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString())); + + realtimeManager = new RealtimeManager( + Arrays.asList( + new FireDepartment( + schema, + new FireDepartmentConfig(1, new Period("P1Y")), + new FirehoseFactory() + { + @Override + public Firehose connect() throws IOException + { + return new TestFirehose(rows.iterator()); + } + }, + new PlumberSchool() + { + @Override + public Plumber findPlumber( + Schema schema, FireDepartmentMetrics metrics + ) + { + return plumber; + } + } + ) + ), + null + ); + } + + @Test + public void testRun() throws Exception + { + realtimeManager.start(); + + Stopwatch stopwatch = new Stopwatch().start(); + while (realtimeManager.getMetrics("test").processed() != 1) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Realtime manager should have completed processing 2 events!"); + } + } + + Assert.assertEquals(1, realtimeManager.getMetrics("test").processed()); + Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway()); + Assert.assertTrue(plumber.isStartedJob()); + Assert.assertTrue(plumber.isFinishedJob()); + Assert.assertEquals(1, plumber.getPersistCount()); + } + + private InputRow makeRow(final long timestamp) + { + return new InputRow() + { + @Override + public List getDimensions() + { + return Arrays.asList("testDim"); + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp; + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(); + } + + @Override + public float getFloatMetric(String metric) + { + return 0; + } + }; + } + + + private static class TestFirehose implements Firehose + { + private final Iterator rows; + + private TestFirehose(Iterator rows) + { + this.rows = rows; + } + + @Override + public boolean hasMore() + { + return rows.hasNext(); + } + + @Override + public InputRow nextRow() + { + return rows.next(); + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + } + } + + private static class TestPlumber implements Plumber + { + private final Sink sink; + + + private volatile boolean startedJob = false; + private volatile boolean finishedJob = false; + private volatile int persistCount = 0; + + private TestPlumber(Sink sink) + { + this.sink = sink; + } + + private boolean isStartedJob() + { + return startedJob; + } + + private boolean isFinishedJob() + { + return finishedJob; + } + + private int getPersistCount() + { + return persistCount; + } + + @Override + public void startJob() + { + startedJob = true; + } + + @Override + public Sink getSink(long timestamp) + { + if (sink.getInterval().contains(timestamp)) { + return sink; + } + return null; + } + + @Override + public QueryRunner getQueryRunner(Query query) + { + throw new UnsupportedOperationException(); + } + + @Override + public void persist(Runnable commitRunnable) + { + persistCount++; + } + + @Override + public void finishJob() + { + finishedJob = true; + } + } +} diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java new file mode 100644 index 00000000000..dcf92019428 --- /dev/null +++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime.plumber; + +import junit.framework.Assert; +import org.joda.time.Interval; +import org.junit.Test; + +/** + */ +public class IntervalStartVersioningPolicyTest +{ + @Test + public void testGetVersion() throws Exception + { + IntervalStartVersioningPolicy policy = new IntervalStartVersioningPolicy(); + String version = policy.getVersion(new Interval("2013-01-01/2013-01-02")); + Assert.assertEquals("2013-01-01T00:00:00.000Z", version); + } +} diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java new file mode 100644 index 00000000000..c113cbf80ca --- /dev/null +++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java @@ -0,0 +1,40 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime.plumber; + +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.junit.Test; + +/** + */ +public class MessageTimeRejectionPolicyFactoryTest +{ + @Test + public void testAccept() throws Exception + { + RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create( + new Period("PT10M") + ); + + Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis())); + } +} diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java new file mode 100644 index 00000000000..a93308c4fa6 --- /dev/null +++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java @@ -0,0 +1,162 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime.plumber; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.metamx.common.ISE; +import com.metamx.druid.Query; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.ServerView; +import com.metamx.druid.coordination.DataSegmentAnnouncer; +import com.metamx.druid.index.v1.IndexGranularity; +import com.metamx.druid.index.v1.SpatialDimensionSchema; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; +import com.metamx.druid.query.QueryRunnerFactory; +import com.metamx.druid.realtime.FireDepartmentMetrics; +import com.metamx.druid.realtime.Schema; +import com.metamx.druid.realtime.SegmentPublisher; +import com.metamx.druid.shard.NoneShardSpec; +import com.metamx.emitter.service.ServiceEmitter; +import junit.framework.Assert; +import org.apache.commons.lang.mutable.MutableBoolean; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + */ +public class RealtimePlumberSchoolTest +{ + private Plumber plumber; + + private DataSegmentAnnouncer announcer; + private SegmentPublisher segmentPublisher; + private DataSegmentPusher dataSegmentPusher; + private ServerView serverView; + private ServiceEmitter emitter; + + @Before + public void setUp() throws Exception + { + + final File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + final Schema schema = new Schema( + "test", + Lists.newArrayList(), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + QueryGranularity.NONE, + new NoneShardSpec() + ); + + RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool( + new Period("PT10m"), + tmpDir, + IndexGranularity.HOUR + ); + + announcer = EasyMock.createMock(DataSegmentAnnouncer.class); + announcer.announceSegment(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + + segmentPublisher = EasyMock.createMock(SegmentPublisher.class); + dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class); + + serverView = EasyMock.createMock(ServerView.class); + serverView.registerSegmentCallback( + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().anyTimes(); + + emitter = EasyMock.createMock(ServiceEmitter.class); + + EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + + realtimePlumberSchool.setConglomerate(new DefaultQueryRunnerFactoryConglomerate(Maps., QueryRunnerFactory>newHashMap())); + realtimePlumberSchool.setSegmentAnnouncer(announcer); + realtimePlumberSchool.setSegmentPublisher(segmentPublisher); + realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory()); + realtimePlumberSchool.setVersioningPolicy(new IntervalStartVersioningPolicy()); + realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher); + realtimePlumberSchool.setServerView(serverView); + realtimePlumberSchool.setServiceEmitter(emitter); + + plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics()); + } + + @After + public void tearDown() throws Exception + { + EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + } + + @Test + public void testGetSink() throws Exception + { + final DateTime theTime = new DateTime("2013-01-01"); + Sink sink = plumber.getSink(theTime.getMillis()); + + Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval()); + Assert.assertEquals(theTime.toString(), sink.getVersion()); + } + + @Test + public void testPersist() throws Exception + { + final MutableBoolean committed = new MutableBoolean(false); + plumber.startJob(); + plumber.persist( + new Runnable() + { + @Override + public void run() + { + committed.setValue(true); + } + } + ); + + Stopwatch stopwatch = new Stopwatch().start(); + while (!committed.booleanValue()) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Taking too long to set perist value"); + } + } + plumber.finishJob(); + } +} diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java new file mode 100644 index 00000000000..c262e345895 --- /dev/null +++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java @@ -0,0 +1,41 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime.plumber; + +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.junit.Test; + +/** + */ +public class ServerTimeRejectionPolicyFactoryTest +{ + @Test + public void testAccept() throws Exception + { + RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create( + new Period("PT10M") + ); + + Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis())); + Assert.assertFalse(rejectionPolicy.accept(new DateTime("2000").getMillis())); + } +} diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java new file mode 100644 index 00000000000..27b06609a0d --- /dev/null +++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java @@ -0,0 +1,124 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime.plumber; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; +import com.metamx.druid.index.v1.SpatialDimensionSchema; +import com.metamx.druid.input.InputRow; +import com.metamx.druid.realtime.FireHydrant; +import com.metamx.druid.realtime.Schema; +import com.metamx.druid.shard.NoneShardSpec; +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class SinkTest +{ + @Test + public void testSwap() throws Exception + { + final Schema schema = new Schema( + "test", + Lists.newArrayList(), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + QueryGranularity.MINUTE, + new NoneShardSpec() + ); + + final Interval interval = new Interval("2013-01-01/2013-01-02"); + final String version = new DateTime().toString(); + final Sink sink = new Sink(interval, schema, version); + + sink.add(new InputRow() + { + @Override + public List getDimensions() + { + return Lists.newArrayList(); + } + + @Override + public long getTimestampFromEpoch() + { + return new DateTime("2013-01-01").getMillis(); + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(); + } + + @Override + public float getFloatMetric(String metric) + { + return 0; + } + }); + + FireHydrant currHydrant = sink.getCurrIndex(); + Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval()); + + + FireHydrant swapHydrant = sink.swap(); + + sink.add(new InputRow() + { + @Override + public List getDimensions() + { + return Lists.newArrayList(); + } + + @Override + public long getTimestampFromEpoch() + { + return new DateTime("2013-01-01").getMillis(); + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(); + } + + @Override + public float getFloatMetric(String metric) + { + return 0; + } + }); + + Assert.assertEquals(currHydrant, swapHydrant); + Assert.assertNotSame(currHydrant, sink.getCurrIndex()); + Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval()); + + Assert.assertEquals(2, Iterators.size(sink.iterator())); + } +}