basic unit tets for realtime functinality

This commit is contained in:
fjy 2013-08-19 15:00:10 -07:00
parent ddcc1b53e0
commit 9c145f5ce3
19 changed files with 865 additions and 20 deletions

View File

@ -320,12 +320,9 @@ public class Announcer
final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath); final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null) { if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path); log.error("Path[%s] not announced, cannot unannounce.", path);
} return;
if (subPaths.remove(pathAndNode.getNode()) == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path);
} }
try { try {

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
/** /**
@ -68,11 +69,16 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
public PartitionChunk<T> remove(PartitionChunk<T> chunk) public PartitionChunk<T> remove(PartitionChunk<T> chunk)
{ {
// Somewhat funky implementation in order to return the removed object as it exists in the set if (!holderSet.isEmpty()) {
PartitionChunk<T> element = holderSet.tailSet(chunk, true).first(); // Somewhat funky implementation in order to return the removed object as it exists in the set
if (chunk.equals(element)) { SortedSet<PartitionChunk<T>> tailSet = holderSet.tailSet(chunk, true);
holderSet.remove(element); if (!tailSet.isEmpty()) {
return element; PartitionChunk<T> element = tailSet.first();
if (chunk.equals(element)) {
holderSet.remove(element);
return element;
}
}
} }
return null; return null;
} }
@ -110,16 +116,17 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
return true; return true;
} }
public PartitionChunk<T> getChunk(final int partitionNum) { public PartitionChunk<T> getChunk(final int partitionNum)
{
final Iterator<PartitionChunk<T>> retVal = Iterators.filter( final Iterator<PartitionChunk<T>> retVal = Iterators.filter(
holderSet.iterator(), new Predicate<PartitionChunk<T>>() holderSet.iterator(), new Predicate<PartitionChunk<T>>()
{ {
@Override @Override
public boolean apply(PartitionChunk<T> input) public boolean apply(PartitionChunk<T> input)
{ {
return input.getChunkNumber() == partitionNum; return input.getChunkNumber() == partitionNum;
} }
} }
); );
return retVal.hasNext() ? retVal.next() : null; return retVal.hasNext() ? retVal.next() : null;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime; package com.metamx.druid.realtime;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime; package com.metamx.druid.realtime;
import org.skife.config.Config; import org.skife.config.Config;

View File

@ -94,6 +94,10 @@ public class RealtimeManager implements QuerySegmentWalker
Closeables.closeQuietly(chief); Closeables.closeQuietly(chief);
} }
} }
public FireDepartmentMetrics getMetrics(String datasource)
{
return chiefs.get(datasource).getMetrics();
}
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
@ -149,6 +153,11 @@ public class RealtimeManager implements QuerySegmentWalker
} }
} }
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
@Override @Override
public void run() public void run()
{ {
@ -186,11 +195,11 @@ public class RealtimeManager implements QuerySegmentWalker
} }
int currCount = sink.add(inputRow); int currCount = sink.add(inputRow);
metrics.incrementProcessed();
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit()); plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
} }
metrics.incrementProcessed();
} }
catch (FormattedException e) { catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails()); log.info(e, "unparseable line: %s", e.getDetails());

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime; package com.metamx.druid.realtime;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.Interval; import org.joda.time.Interval;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -100,6 +100,11 @@ public class Sink implements Iterable<FireHydrant>
return interval; return interval;
} }
public FireHydrant getCurrIndex()
{
return currIndex;
}
public int add(InputRow row) public int add(InputRow row)
{ {
if (currIndex == null) { if (currIndex == null) {

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber; package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;

View File

@ -0,0 +1,252 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.guava.Runnables;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.PlumberSchool;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*/
public class RealtimeManagerTest
{
private RealtimeManager realtimeManager;
private Schema schema;
private TestPlumber plumber;
@Before
public void setUp() throws Exception
{
schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.NONE,
new NoneShardSpec()
);
final List<InputRow> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString()));
realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FirehoseFactory()
{
@Override
public Firehose connect() throws IOException
{
return new TestFirehose(rows.iterator());
}
},
new PlumberSchool()
{
@Override
public Plumber findPlumber(
Schema schema, FireDepartmentMetrics metrics
)
{
return plumber;
}
}
)
),
null
);
}
@Test
public void testRun() throws Exception
{
realtimeManager.start();
Stopwatch stopwatch = new Stopwatch().start();
while (realtimeManager.getMetrics("test").processed() != 1) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Realtime manager should have completed processing 2 events!");
}
}
Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(1, plumber.getPersistCount());
}
private InputRow makeRow(final long timestamp)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
};
}
private static class TestFirehose implements Firehose
{
private final Iterator<InputRow> rows;
private TestFirehose(Iterator<InputRow> rows)
{
this.rows = rows;
}
@Override
public boolean hasMore()
{
return rows.hasNext();
}
@Override
public InputRow nextRow()
{
return rows.next();
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
}
}
private static class TestPlumber implements Plumber
{
private final Sink sink;
private volatile boolean startedJob = false;
private volatile boolean finishedJob = false;
private volatile int persistCount = 0;
private TestPlumber(Sink sink)
{
this.sink = sink;
}
private boolean isStartedJob()
{
return startedJob;
}
private boolean isFinishedJob()
{
return finishedJob;
}
private int getPersistCount()
{
return persistCount;
}
@Override
public void startJob()
{
startedJob = true;
}
@Override
public Sink getSink(long timestamp)
{
if (sink.getInterval().contains(timestamp)) {
return sink;
}
return null;
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
throw new UnsupportedOperationException();
}
@Override
public void persist(Runnable commitRunnable)
{
persistCount++;
}
@Override
public void finishJob()
{
finishedJob = true;
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Test;
/**
*/
public class IntervalStartVersioningPolicyTest
{
@Test
public void testGetVersion() throws Exception
{
IntervalStartVersioningPolicy policy = new IntervalStartVersioningPolicy();
String version = policy.getVersion(new Interval("2013-01-01/2013-01-02"));
Assert.assertEquals("2013-01-01T00:00:00.000Z", version);
}
}

View File

@ -0,0 +1,40 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Test;
/**
*/
public class MessageTimeRejectionPolicyFactoryTest
{
@Test
public void testAccept() throws Exception
{
RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create(
new Period("PT10M")
);
Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis()));
}
}

View File

@ -0,0 +1,162 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.service.ServiceEmitter;
import junit.framework.Assert;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
*/
public class RealtimePlumberSchoolTest
{
private Plumber plumber;
private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher;
private ServerView serverView;
private ServiceEmitter emitter;
@Before
public void setUp() throws Exception
{
final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
final Schema schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.NONE,
new NoneShardSpec()
);
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
announcer.announceSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
serverView = EasyMock.createMock(ServerView.class);
serverView.registerSegmentCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
emitter = EasyMock.createMock(ServiceEmitter.class);
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
realtimePlumberSchool.setConglomerate(new DefaultQueryRunnerFactoryConglomerate(Maps.<Class<? extends Query>, QueryRunnerFactory>newHashMap()));
realtimePlumberSchool.setSegmentAnnouncer(announcer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory());
realtimePlumberSchool.setVersioningPolicy(new IntervalStartVersioningPolicy());
realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher);
realtimePlumberSchool.setServerView(serverView);
realtimePlumberSchool.setServiceEmitter(emitter);
plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics());
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
}
@Test
public void testGetSink() throws Exception
{
final DateTime theTime = new DateTime("2013-01-01");
Sink sink = plumber.getSink(theTime.getMillis());
Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval());
Assert.assertEquals(theTime.toString(), sink.getVersion());
}
@Test
public void testPersist() throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
plumber.startJob();
plumber.persist(
new Runnable()
{
@Override
public void run()
{
committed.setValue(true);
}
}
);
Stopwatch stopwatch = new Stopwatch().start();
while (!committed.booleanValue()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Taking too long to set perist value");
}
}
plumber.finishJob();
}
}

View File

@ -0,0 +1,41 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Test;
/**
*/
public class ServerTimeRejectionPolicyFactoryTest
{
@Test
public void testAccept() throws Exception
{
RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create(
new Period("PT10M")
);
Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis()));
Assert.assertFalse(rejectionPolicy.accept(new DateTime("2000").getMillis()));
}
}

View File

@ -0,0 +1,124 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.plumber;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import java.util.List;
/**
*/
public class SinkTest
{
@Test
public void testSwap() throws Exception
{
final Schema schema = new Schema(
"test",
Lists.<SpatialDimensionSchema>newArrayList(),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
QueryGranularity.MINUTE,
new NoneShardSpec()
);
final Interval interval = new Interval("2013-01-01/2013-01-02");
final String version = new DateTime().toString();
final Sink sink = new Sink(interval, schema, version);
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
});
FireHydrant currHydrant = sink.getCurrIndex();
Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval());
FireHydrant swapHydrant = sink.swap();
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
});
Assert.assertEquals(currHydrant, swapHydrant);
Assert.assertNotSame(currHydrant, sink.getCurrIndex());
Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval());
Assert.assertEquals(2, Iterators.size(sink.iterator()));
}
}