put all concurrency test back into server manager test, introduce better concurrency for reference counting

This commit is contained in:
fjy 2013-08-01 13:13:03 -07:00
parent f76540c16c
commit e92e0c35a4
7 changed files with 467 additions and 646 deletions

View File

@ -38,7 +38,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.5</metamx.java-util.version>
<metamx.java-util.version>0.22.6</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties>

View File

@ -23,7 +23,9 @@ import com.metamx.druid.StorageAdapter;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReferenceCountingSegment implements Segment
{
@ -43,9 +45,15 @@ public class ReferenceCountingSegment implements Segment
public Segment getBaseSegment()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment;
}
return null;
}
}
public boolean isClosed()
{
return isClosed;
@ -54,40 +62,82 @@ public class ReferenceCountingSegment implements Segment
@Override
public String getIdentifier()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.getIdentifier();
}
return null;
}
}
@Override
public Interval getDataInterval()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.getDataInterval();
}
return null;
}
}
@Override
public QueryableIndex asQueryableIndex()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.asQueryableIndex();
}
return null;
}
}
@Override
public StorageAdapter asStorageAdapter()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.asStorageAdapter();
}
return null;
}
}
@Override
public void close() throws IOException
{
synchronized (lock) {
if (!isClosed) {
baseSegment.close();
isClosed = true;
}
}
}
public void increment()
public Closeable increment()
{
synchronized (lock) {
if (!isClosed) {
numReferences++;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
}
return null;
}
}
public void decrement()
@ -101,9 +151,6 @@ public class ReferenceCountingSegment implements Segment
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
finally {
isClosed = true;
}
}
}
}

View File

@ -19,13 +19,12 @@
package com.metamx.druid.index;
import com.metamx.common.guava.ResourceClosingYielder;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import java.io.IOException;
/**
*/
public class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
@ -44,44 +43,6 @@ public class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
segment.increment();
return new ReferenceCountingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), segment);
}
private static class ReferenceCountingYielder<OutType> implements Yielder<OutType>
{
private final Yielder<OutType> baseYielder;
private final ReferenceCountingSegment segment;
public ReferenceCountingYielder(Yielder<OutType> baseYielder, ReferenceCountingSegment segment)
{
this.baseYielder = baseYielder;
this.segment = segment;
}
@Override
public OutType get()
{
return baseYielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return new ReferenceCountingYielder<OutType>(baseYielder.next(initValue), segment);
}
@Override
public boolean isDone()
{
return baseYielder.isDone();
}
@Override
public void close() throws IOException
{
segment.decrement();
baseYielder.close();
}
return new ResourceClosingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), segment.increment());
}
}

View File

@ -1,95 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.Segment;
import org.joda.time.Interval;
import java.io.IOException;
/**
*/
public class SegmentForTesting implements Segment
{
private final String version;
private final Interval interval;
private final Object lock = new Object();
private volatile boolean closed = false;
SegmentForTesting(
String version,
Interval interval
)
{
this.version = version;
this.interval = interval;
}
public String getVersion()
{
return version;
}
public Interval getInterval()
{
return interval;
}
@Override
public String getIdentifier()
{
return version;
}
public boolean isClosed()
{
return closed;
}
@Override
public Interval getDataInterval()
{
return interval;
}
@Override
public QueryableIndex asQueryableIndex()
{
throw new UnsupportedOperationException();
}
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
synchronized (lock) {
closed = true;
}
}
}

View File

@ -1,326 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.druid.Druids;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.index.ReferenceCountingSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.druid.query.NoopQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SearchResultValue;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*/
public class ServerManagerConcurrencyTest
{
private TestServerManager serverManager;
private ConcurrencyTestQueryRunnerFactory factory;
private CountDownLatch queryWaitLatch;
private CountDownLatch queryNotifyLatch;
@Before
public void setUp() throws IOException
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
queryWaitLatch = new CountDownLatch(1);
queryNotifyLatch = new CountDownLatch(1);
factory = new ConcurrencyTestQueryRunnerFactory(queryWaitLatch, queryNotifyLatch);
serverManager = new TestServerManager(factory);
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "2", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-03"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-04"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T01"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T02"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T03"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T06"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-02"));
}
@Test
public void testReferenceCounting() throws Exception
{
serverManager.loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
Future future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
queryNotifyLatch.await();
Assert.assertTrue(factory.getAdapters().size() == 1);
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
queryWaitLatch.countDown();
future.get();
serverManager.dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertTrue(segmentForTesting.isClosed());
}
}
@Test
public void testReferenceCountingWhileQueryExecuting() throws Exception
{
serverManager.loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
Future future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
queryNotifyLatch.await();
Assert.assertTrue(factory.getAdapters().size() == 1);
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
serverManager.dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
queryWaitLatch.countDown();
future.get();
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertTrue(segmentForTesting.isClosed());
}
}
private <T> Future assertQueryable(
QueryGranularity granularity,
String dataSource,
Interval interval,
List<Pair<String, Interval>> expected
)
{
final Iterator<Pair<String, Interval>> expectedIter = expected.iterator();
final List<Interval> intervals = Arrays.asList(interval);
final SearchQuery query = Druids.newSearchQueryBuilder()
.dataSource(dataSource)
.intervals(intervals)
.granularity(granularity)
.limit(10000)
.query("wow")
.build();
final QueryRunner<Result<SearchResultValue>> runner = serverManager.getQueryRunnerForIntervals(
query,
intervals
);
return Executors.newSingleThreadExecutor().submit(
new Runnable()
{
@Override
public void run()
{
Sequence<Result<SearchResultValue>> seq = runner.run(query);
Sequences.toList(seq, Lists.<Result<SearchResultValue>>newArrayList());
Iterator<SegmentForTesting> adaptersIter = factory.getAdapters().iterator();
while (expectedIter.hasNext() && adaptersIter.hasNext()) {
Pair<String, Interval> expectedVals = expectedIter.next();
SegmentForTesting value = adaptersIter.next();
Assert.assertEquals(expectedVals.lhs, value.getVersion());
Assert.assertEquals(expectedVals.rhs, value.getInterval());
}
Assert.assertFalse(expectedIter.hasNext());
Assert.assertFalse(adaptersIter.hasNext());
}
}
);
}
public static class ConcurrencyTestQueryRunnerFactory extends ServerManagerTest.MyQueryRunnerFactory
{
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
private List<SegmentForTesting> adapters = Lists.newArrayList();
public ConcurrencyTestQueryRunnerFactory(CountDownLatch waitLatch, CountDownLatch notifyLatch)
{
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public QueryRunner<Result<SearchResultValue>> createRunner(Segment adapter)
{
if (!(adapter instanceof ReferenceCountingSegment)) {
throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass());
}
adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment());
return new BlockingQueryRunner<Result<SearchResultValue>>(
new NoopQueryRunner<Result<SearchResultValue>>(),
waitLatch,
notifyLatch
);
}
@Override
public List<SegmentForTesting> getAdapters()
{
return adapters;
}
@Override
public void clearAdapters()
{
adapters.clear();
}
}
private static class BlockingQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> runner;
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
public BlockingQueryRunner(
QueryRunner<T> runner,
CountDownLatch waitLatch,
CountDownLatch notifyLatch
)
{
this.runner = runner;
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public Sequence<T> run(Query<T> query)
{
return new BlockingSequence<T>(runner.run(query), waitLatch, notifyLatch);
}
}
private static class BlockingSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
public BlockingSequence(
Sequence<T> baseSequence,
CountDownLatch waitLatch,
CountDownLatch notifyLatch
)
{
this.baseSequence = baseSequence;
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public <OutType> Yielder<OutType> toYielder(
final OutType initValue, final YieldingAccumulator<OutType, T> accumulator
)
{
notifyLatch.countDown();
final Yielder<OutType> baseYielder = baseSequence.toYielder(initValue, accumulator);
return new Yielder<OutType>()
{
@Override
public OutType get()
{
try {
waitLatch.await();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return baseYielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return baseYielder.next(initValue);
}
@Override
public boolean isDone()
{
return baseYielder.isDone();
}
@Override
public void close() throws IOException
{
baseYielder.close();
}
};
}
}
}

View File

@ -22,28 +22,42 @@ package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.IAE;
import com.metamx.common.MapUtils;
import com.metamx.common.Pair;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.druid.Druids;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.ReferenceCountingSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.druid.query.ConcatQueryRunner;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.NoopQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SearchResultValue;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
@ -55,42 +69,85 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*/
public class ServerManagerTest
{
TestServerManager serverManager;
MyQueryRunnerFactory factory;
private ServerManager serverManager;
private MyQueryRunnerFactory factory;
private CountDownLatch queryWaitLatch;
private CountDownLatch queryNotifyLatch;
private ExecutorService serverManagerExec;
@Before
public void setUp() throws IOException
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
factory = new MyQueryRunnerFactory();
serverManager = new TestServerManager(factory);
queryWaitLatch = new CountDownLatch(1);
queryNotifyLatch = new CountDownLatch(1);
factory = new MyQueryRunnerFactory(queryWaitLatch, queryNotifyLatch);
serverManagerExec = Executors.newFixedThreadPool(2);
serverManager = new ServerManager(
new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return false;
}
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "2", new Interval("P1d/2011-04-02"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-03"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-04"));
serverManager.loadQueryable("test", "1", new Interval("P1d/2011-04-05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T01"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T02"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T03"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T05"));
serverManager.loadQueryable("test", "2", new Interval("PT1h/2011-04-04T06"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-01"));
serverManager.loadQueryable("test2", "1", new Interval("P1d/2011-04-02"));
@Override
public Segment getSegment(final DataSegment segment)
{
return new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
(Interval) segment.getLoadSpec().get("interval")
);
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
},
new QueryRunnerFactoryConglomerate()
{
@Override
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return (QueryRunnerFactory) factory;
}
},
new NoopServiceEmitter(),
serverManagerExec
);
loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
loadQueryable("test", "1", new Interval("P1d/2011-04-02"));
loadQueryable("test", "2", new Interval("P1d/2011-04-02"));
loadQueryable("test", "1", new Interval("P1d/2011-04-03"));
loadQueryable("test", "1", new Interval("P1d/2011-04-04"));
loadQueryable("test", "1", new Interval("P1d/2011-04-05"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T01"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T02"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T03"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T05"));
loadQueryable("test", "2", new Interval("PT1h/2011-04-04T06"));
loadQueryable("test2", "1", new Interval("P1d/2011-04-01"));
loadQueryable("test2", "1", new Interval("P1d/2011-04-02"));
}
@Test
public void testSimpleGet()
{
assertQueryable(
Future future = assertQueryable(
QueryGranularity.DAY,
"test",
new Interval("P1d/2011-04-01"),
@ -98,8 +155,10 @@ public class ServerManagerTest
new Pair<String, Interval>("1", new Interval("P1d/2011-04-01"))
)
);
waitForTestVerificationAndCleanup(future);
assertQueryable(
future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("P2d/2011-04-02"),
ImmutableList.<Pair<String, Interval>>of(
@ -107,6 +166,7 @@ public class ServerManagerTest
new Pair<String, Interval>("2", new Interval("P1d/2011-04-02"))
)
);
waitForTestVerificationAndCleanup(future);
}
@Test
@ -115,41 +175,44 @@ public class ServerManagerTest
final String dataSouce = "test";
final Interval interval = new Interval("2011-04-01/2011-04-02");
assertQueryable(
Future future = assertQueryable(
QueryGranularity.DAY,
dataSouce, interval,
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("2", interval)
)
);
waitForTestVerificationAndCleanup(future);
serverManager.dropQueryable(dataSouce, "2", interval);
assertQueryable(
dropQueryable(dataSouce, "2", interval);
future = assertQueryable(
QueryGranularity.DAY,
dataSouce, interval,
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("1", interval)
)
);
waitForTestVerificationAndCleanup(future);
}
@Test
public void testDelete2() throws Exception
{
serverManager.loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
assertQueryable(
Future future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
waitForTestVerificationAndCleanup(future);
serverManager.dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
serverManager.dropQueryable("test", "1", new Interval("2011-04-04/2011-04-05"));
dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
dropQueryable("test", "1", new Interval("2011-04-04/2011-04-05"));
assertQueryable(
future = assertQueryable(
QueryGranularity.HOUR,
"test", new Interval("2011-04-04/2011-04-04T06"),
ImmutableList.<Pair<String, Interval>>of(
@ -160,8 +223,9 @@ public class ServerManagerTest
new Pair<String, Interval>("2", new Interval("2011-04-04T05/2011-04-04T06"))
)
);
waitForTestVerificationAndCleanup(future);
assertQueryable(
future = assertQueryable(
QueryGranularity.HOUR,
"test", new Interval("2011-04-04/2011-04-04T03"),
ImmutableList.<Pair<String, Interval>>of(
@ -170,8 +234,9 @@ public class ServerManagerTest
new Pair<String, Interval>("2", new Interval("2011-04-04T02/2011-04-04T03"))
)
);
waitForTestVerificationAndCleanup(future);
assertQueryable(
future = assertQueryable(
QueryGranularity.HOUR,
"test", new Interval("2011-04-04T04/2011-04-04T06"),
ImmutableList.<Pair<String, Interval>>of(
@ -179,16 +244,96 @@ public class ServerManagerTest
new Pair<String, Interval>("2", new Interval("2011-04-04T05/2011-04-04T06"))
)
);
waitForTestVerificationAndCleanup(future);
}
private <T> void assertQueryable(
@Test
public void testReferenceCounting() throws Exception
{
loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
Future future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
queryNotifyLatch.await();
Assert.assertTrue(factory.getAdapters().size() == 1);
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
queryWaitLatch.countDown();
future.get();
dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertTrue(segmentForTesting.isClosed());
}
}
@Test
public void testReferenceCountingWhileQueryExecuting() throws Exception
{
loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
Future future = assertQueryable(
QueryGranularity.DAY,
"test", new Interval("2011-04-04/2011-04-06"),
ImmutableList.<Pair<String, Interval>>of(
new Pair<String, Interval>("3", new Interval("2011-04-04/2011-04-05"))
)
);
queryNotifyLatch.await();
Assert.assertTrue(factory.getAdapters().size() == 1);
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05"));
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertFalse(segmentForTesting.isClosed());
}
queryWaitLatch.countDown();
future.get();
for (SegmentForTesting segmentForTesting : factory.getAdapters()) {
Assert.assertTrue(segmentForTesting.isClosed());
}
}
private void waitForTestVerificationAndCleanup(Future future)
{
try {
queryNotifyLatch.await();
queryWaitLatch.countDown();
future.get();
factory.clearAdapters();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private <T> Future assertQueryable(
QueryGranularity granularity,
String dataSource,
Interval interval,
List<Pair<String, Interval>> expected
)
{
Iterator<Pair<String, Interval>> expectedIter = expected.iterator();
final Iterator<Pair<String, Interval>> expectedIter = expected.iterator();
final List<Interval> intervals = Arrays.asList(interval);
final SearchQuery query = Druids.newSearchQueryBuilder()
.dataSource(dataSource)
@ -197,8 +342,18 @@ public class ServerManagerTest
.limit(10000)
.query("wow")
.build();
QueryRunner<Result<SearchResultValue>> runner = serverManager.getQueryRunnerForIntervals(query, intervals);
final Sequence<Result<SearchResultValue>> seq = runner.run(query);
final QueryRunner<Result<SearchResultValue>> runner = serverManager.getQueryRunnerForIntervals(
query,
intervals
);
return serverManagerExec.submit(
new Runnable()
{
@Override
public void run()
{
Sequence<Result<SearchResultValue>> seq = runner.run(query);
Sequences.toList(seq, Lists.<Result<SearchResultValue>>newArrayList());
Iterator<SegmentForTesting> adaptersIter = factory.getAdapters().iterator();
@ -212,14 +367,67 @@ public class ServerManagerTest
Assert.assertFalse(expectedIter.hasNext());
Assert.assertFalse(adaptersIter.hasNext());
}
}
);
}
factory.clearAdapters();
public void loadQueryable(String dataSource, String version, Interval interval) throws IOException
{
try {
serverManager.loadSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
public void dropQueryable(String dataSource, String version, Interval interval)
{
try {
serverManager.dropSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
public static class MyQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
{
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
private List<SegmentForTesting> adapters = Lists.newArrayList();
public MyQueryRunnerFactory(CountDownLatch waitLatch, CountDownLatch notifyLatch)
{
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public QueryRunner<Result<SearchResultValue>> createRunner(Segment adapter)
{
@ -227,7 +435,11 @@ public class ServerManagerTest
throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass());
}
adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment());
return new NoopQueryRunner<Result<SearchResultValue>>();
return new BlockingQueryRunner<Result<SearchResultValue>>(
new NoopQueryRunner<Result<SearchResultValue>>(),
waitLatch,
notifyLatch
);
}
@Override
@ -289,4 +501,154 @@ public class ServerManagerTest
};
}
}
private static class SegmentForTesting implements Segment
{
private final String version;
private final Interval interval;
private final Object lock = new Object();
private volatile boolean closed = false;
SegmentForTesting(
String version,
Interval interval
)
{
this.version = version;
this.interval = interval;
}
public String getVersion()
{
return version;
}
public Interval getInterval()
{
return interval;
}
@Override
public String getIdentifier()
{
return version;
}
public boolean isClosed()
{
return closed;
}
@Override
public Interval getDataInterval()
{
return interval;
}
@Override
public QueryableIndex asQueryableIndex()
{
throw new UnsupportedOperationException();
}
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
synchronized (lock) {
closed = true;
}
}
}
private static class BlockingQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> runner;
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
public BlockingQueryRunner(
QueryRunner<T> runner,
CountDownLatch waitLatch,
CountDownLatch notifyLatch
)
{
this.runner = runner;
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public Sequence<T> run(Query<T> query)
{
return new BlockingSequence<T>(runner.run(query), waitLatch, notifyLatch);
}
}
private static class BlockingSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final CountDownLatch waitLatch;
private final CountDownLatch notifyLatch;
public BlockingSequence(
Sequence<T> baseSequence,
CountDownLatch waitLatch,
CountDownLatch notifyLatch
)
{
this.baseSequence = baseSequence;
this.waitLatch = waitLatch;
this.notifyLatch = notifyLatch;
}
@Override
public <OutType> Yielder<OutType> toYielder(
final OutType initValue, final YieldingAccumulator<OutType, T> accumulator
)
{
notifyLatch.countDown();
final Yielder<OutType> baseYielder = baseSequence.toYielder(initValue, accumulator);
return new Yielder<OutType>()
{
@Override
public OutType get()
{
try {
waitLatch.await();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return baseYielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return baseYielder.next(initValue);
}
@Override
public boolean isDone()
{
return baseYielder.isDone();
}
@Override
public void close() throws IOException
{
baseYielder.close();
}
};
}
}
}

View File

@ -1,128 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.MapUtils;
import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public class TestServerManager extends ServerManager
{
public TestServerManager(
final QueryRunnerFactory factory
)
{
super(
new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return false;
}
@Override
public Segment getSegment(final DataSegment segment)
{
return new SegmentForTesting(
MapUtils.getString(segment.getLoadSpec(), "version"),
(Interval) segment.getLoadSpec().get("interval")
);
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
},
new QueryRunnerFactoryConglomerate()
{
@Override
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return (QueryRunnerFactory) factory;
}
},
new NoopServiceEmitter(),
MoreExecutors.sameThreadExecutor()
);
}
public void loadQueryable(String dataSource, String version, Interval interval) throws IOException
{
try {
super.loadSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
public void dropQueryable(String dataSource, String version, Interval interval)
{
try {
super.dropSegment(
new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
123l
)
);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
}