Merge pull request #222 from metamx/rt-ut

Basic unit tests for realtime functionality
This commit is contained in:
cheddar 2013-08-21 13:33:39 -07:00
commit 3bcee8cdb3
19 changed files with 878 additions and 20 deletions

View File

@ -320,12 +320,9 @@ public class Announcer
final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path);
}
if (subPaths.remove(pathAndNode.getNode()) == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path);
if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) {
log.error("Path[%s] not announced, cannot unannounce.", path);
return;
}
try {

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
/**
@ -68,11 +69,16 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
public PartitionChunk<T> remove(PartitionChunk<T> chunk)
{
// Somewhat funky implementation in order to return the removed object as it exists in the set
PartitionChunk<T> element = holderSet.tailSet(chunk, true).first();
if (chunk.equals(element)) {
holderSet.remove(element);
return element;
if (!holderSet.isEmpty()) {
// Somewhat funky implementation in order to return the removed object as it exists in the set
SortedSet<PartitionChunk<T>> tailSet = holderSet.tailSet(chunk, true);
if (!tailSet.isEmpty()) {
PartitionChunk<T> element = tailSet.first();
if (chunk.equals(element)) {
holderSet.remove(element);
return element;
}
}
}
return null;
}
@ -110,16 +116,17 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
return true;
}
public PartitionChunk<T> getChunk(final int partitionNum) {
public PartitionChunk<T> getChunk(final int partitionNum)
{
final Iterator<PartitionChunk<T>> retVal = Iterators.filter(
holderSet.iterator(), new Predicate<PartitionChunk<T>>()
{
@Override
public boolean apply(PartitionChunk<T> input)
{
return input.getChunkNumber() == partitionNum;
}
}
{
@Override
public boolean apply(PartitionChunk<T> input)
{
return input.getChunkNumber() == partitionNum;
}
}
);
return retVal.hasNext() ? retVal.next() : null;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import org.skife.config.Config;

View File

@ -94,6 +94,14 @@ public class RealtimeManager implements QuerySegmentWalker
Closeables.closeQuietly(chief);
}
}
public FireDepartmentMetrics getMetrics(String datasource)
{
FireChief chief = chiefs.get(datasource);
if (chief == null) {
return null;
}
return chief.getMetrics();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
@ -149,6 +157,11 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
@Override
public void run()
{
@ -186,11 +199,11 @@ public class RealtimeManager implements QuerySegmentWalker
}
int currCount = sink.add(inputRow);
metrics.incrementProcessed();
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
metrics.incrementProcessed();
}
catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails());

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import com.metamx.druid.client.DataSegment;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import org.joda.time.Interval;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;

View File

@ -100,6 +100,11 @@ public class Sink implements Iterable<FireHydrant>
return interval;
}
public FireHydrant getCurrIndex()
{
return currIndex;
}
public int add(InputRow row)
{
if (currIndex == null) {

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes;

View File

@ -0,0 +1,252 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.guava.Runnables;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.PlumberSchool;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*/
public class RealtimeManagerTest
{
private RealtimeManager realtimeManager;
private Schema schema;
private TestPlumber plumber;
@Before
public void setUp() throws Exception
{
schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.NONE,
new NoneShardSpec()
);
final List<InputRow> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString()));
realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FirehoseFactory()
{
@Override
public Firehose connect() throws IOException
{
return new TestFirehose(rows.iterator());
}
},
new PlumberSchool()
{
@Override
public Plumber findPlumber(
Schema schema, FireDepartmentMetrics metrics
)
{
return plumber;
}
}
)
),
null
);
}
@Test
public void testRun() throws Exception
{
realtimeManager.start();
Stopwatch stopwatch = new Stopwatch().start();
while (realtimeManager.getMetrics("test").processed() != 1) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Realtime manager should have completed processing 2 events!");
}
}
Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(1, plumber.getPersistCount());
}
private InputRow makeRow(final long timestamp)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
};
}
private static class TestFirehose implements Firehose
{
private final Iterator<InputRow> rows;
private TestFirehose(Iterator<InputRow> rows)
{
this.rows = rows;
}
@Override
public boolean hasMore()
{
return rows.hasNext();
}
@Override
public InputRow nextRow()
{
return rows.next();
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
}
}
private static class TestPlumber implements Plumber
{
private final Sink sink;
private volatile boolean startedJob = false;
private volatile boolean finishedJob = false;
private volatile int persistCount = 0;
private TestPlumber(Sink sink)
{
this.sink = sink;
}
private boolean isStartedJob()
{
return startedJob;
}
private boolean isFinishedJob()
{
return finishedJob;
}
private int getPersistCount()
{
return persistCount;
}
@Override
public void startJob()
{
startedJob = true;
}
@Override
public Sink getSink(long timestamp)
{
if (sink.getInterval().contains(timestamp)) {
return sink;
}
return null;
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
throw new UnsupportedOperationException();
}
@Override
public void persist(Runnable commitRunnable)
{
persistCount++;
}
@Override
public void finishJob()
{
finishedJob = true;
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Test;
/**
*/
public class IntervalStartVersioningPolicyTest
{
@Test
public void testGetVersion() throws Exception
{
IntervalStartVersioningPolicy policy = new IntervalStartVersioningPolicy();
String version = policy.getVersion(new Interval("2013-01-01/2013-01-02"));
Assert.assertEquals("2013-01-01T00:00:00.000Z", version);
}
}

View File

@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Test;
/**
*/
public class MessageTimeRejectionPolicyFactoryTest
{
@Test
public void testAccept() throws Exception
{
Period period = new Period("PT10M");
RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create(period);
DateTime now = new DateTime();
DateTime past = now.minus(period).minus(1);
DateTime future = now.plus(period).plus(1);
Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
Assert.assertTrue(rejectionPolicy.accept(future.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(now.getMillis()));
}
}

View File

@ -0,0 +1,162 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.service.ServiceEmitter;
import junit.framework.Assert;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
*/
public class RealtimePlumberSchoolTest
{
private Plumber plumber;
private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher;
private ServerView serverView;
private ServiceEmitter emitter;
@Before
public void setUp() throws Exception
{
final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
final Schema schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.NONE,
new NoneShardSpec()
);
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
announcer.announceSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
serverView = EasyMock.createMock(ServerView.class);
serverView.registerSegmentCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
emitter = EasyMock.createMock(ServiceEmitter.class);
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
realtimePlumberSchool.setConglomerate(new DefaultQueryRunnerFactoryConglomerate(Maps.<Class<? extends Query>, QueryRunnerFactory>newHashMap()));
realtimePlumberSchool.setSegmentAnnouncer(announcer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory());
realtimePlumberSchool.setVersioningPolicy(new IntervalStartVersioningPolicy());
realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher);
realtimePlumberSchool.setServerView(serverView);
realtimePlumberSchool.setServiceEmitter(emitter);
plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics());
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
}
@Test
public void testGetSink() throws Exception
{
final DateTime theTime = new DateTime("2013-01-01");
Sink sink = plumber.getSink(theTime.getMillis());
Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval());
Assert.assertEquals(theTime.toString(), sink.getVersion());
}
@Test
public void testPersist() throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
plumber.startJob();
plumber.persist(
new Runnable()
{
@Override
public void run()
{
committed.setValue(true);
}
}
);
Stopwatch stopwatch = new Stopwatch().start();
while (!committed.booleanValue()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Taking too long to set perist value");
}
}
plumber.finishJob();
}
}

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Test;
/**
*/
public class ServerTimeRejectionPolicyFactoryTest
{
@Test
public void testAccept() throws Exception
{
Period period = new Period("PT10M");
RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create(period);
DateTime now = new DateTime();
DateTime past = now.minus(period).minus(1);
Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
}
}

View File

@ -0,0 +1,124 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import java.util.List;
/**
*/
public class SinkTest
{
@Test
public void testSwap() throws Exception
{
final Schema schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.MINUTE,
new NoneShardSpec()
);
final Interval interval = new Interval("2013-01-01/2013-01-02");
final String version = new DateTime().toString();
final Sink sink = new Sink(interval, schema, version);
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
});
FireHydrant currHydrant = sink.getCurrIndex();
Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval());
FireHydrant swapHydrant = sink.swap();
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
});
Assert.assertEquals(currHydrant, swapHydrant);
Assert.assertNotSame(currHydrant, sink.getCurrIndex());
Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval());
Assert.assertEquals(2, Iterators.size(sink.iterator()));
}
}