Merge pull request #217 from metamx/server_manager_unit_test

Robustness fixes for properly keeping track of when Segments are accessible
This commit is contained in:
fjy 2013-08-13 17:15:13 -07:00
commit 6d7b8bc8a6
3 changed files with 76 additions and 70 deletions

View File

@ -46,11 +46,11 @@ public class ReferenceCountingSegment implements Segment
public Segment getBaseSegment() public Segment getBaseSegment()
{ {
synchronized (lock) { synchronized (lock) {
if (!isClosed) { if (isClosed) {
return baseSegment; return null;
} }
return null; return baseSegment;
} }
} }
@ -68,11 +68,11 @@ public class ReferenceCountingSegment implements Segment
public String getIdentifier() public String getIdentifier()
{ {
synchronized (lock) { synchronized (lock) {
if (!isClosed) { if (isClosed) {
return baseSegment.getIdentifier(); return null;
} }
return null; return baseSegment.getIdentifier();
} }
} }
@ -80,11 +80,11 @@ public class ReferenceCountingSegment implements Segment
public Interval getDataInterval() public Interval getDataInterval()
{ {
synchronized (lock) { synchronized (lock) {
if (!isClosed) { if (isClosed) {
return baseSegment.getDataInterval(); return null;
} }
return null; return baseSegment.getDataInterval();
} }
} }
@ -92,11 +92,11 @@ public class ReferenceCountingSegment implements Segment
public QueryableIndex asQueryableIndex() public QueryableIndex asQueryableIndex()
{ {
synchronized (lock) { synchronized (lock) {
if (!isClosed) { if (isClosed) {
return baseSegment.asQueryableIndex(); return null;
} }
return null; return baseSegment.asQueryableIndex();
} }
} }
@ -104,11 +104,11 @@ public class ReferenceCountingSegment implements Segment
public StorageAdapter asStorageAdapter() public StorageAdapter asStorageAdapter()
{ {
synchronized (lock) { synchronized (lock) {
if (!isClosed) { if (isClosed) {
return baseSegment.asStorageAdapter(); return null;
} }
return null; return baseSegment.asStorageAdapter();
} }
} }
@ -116,24 +116,18 @@ public class ReferenceCountingSegment implements Segment
public void close() throws IOException public void close() throws IOException
{ {
synchronized (lock) { synchronized (lock) {
log.info("Trying to close %s", baseSegment.getIdentifier()); if (isClosed) {
if (!isClosed) {
if (numReferences > 0) {
log.info(
"%d references to %s still exist. Decrementing instead.",
numReferences,
baseSegment.getIdentifier()
);
decrement();
} else {
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);
baseSegment.close();
isClosed = true;
}
} else {
log.info("Failed to close, %s is closed already", baseSegment.getIdentifier()); log.info("Failed to close, %s is closed already", baseSegment.getIdentifier());
return;
}
if (numReferences > 0) {
log.info("%d references to %s still exist. Decrementing.", numReferences, baseSegment.getIdentifier());
decrement();
} else {
log.info("Closing %s", baseSegment.getIdentifier());
innerClose();
} }
} }
} }
@ -141,38 +135,50 @@ public class ReferenceCountingSegment implements Segment
public Closeable increment() public Closeable increment()
{ {
synchronized (lock) { synchronized (lock) {
if (!isClosed) { if (isClosed) {
numReferences++; return null;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
} }
return null; numReferences++;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
} }
} }
private void decrement() private void decrement()
{ {
synchronized (lock) { synchronized (lock) {
if (!isClosed) { if (isClosed) {
if (--numReferences < 0) { return;
try { }
close();
} if (--numReferences < 0) {
catch (Exception e) { try {
log.error("Unable to close queryable index %s", getIdentifier()); innerClose();
} }
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
} }
} }
} }
} }
private void innerClose() throws IOException
{
synchronized (lock) {
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);
isClosed = true;
baseSegment.close();
}
}
} }

View File

@ -12,7 +12,7 @@ import java.io.Closeable;
*/ */
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T> public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{ {
private final QueryRunner<T> runner; private final QueryRunnerFactory<T, Query<T>> factory;
private final ReferenceCountingSegment adapter; private final ReferenceCountingSegment adapter;
public ReferenceCountingSegmentQueryRunner( public ReferenceCountingSegmentQueryRunner(
@ -20,9 +20,8 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
ReferenceCountingSegment adapter ReferenceCountingSegment adapter
) )
{ {
this.factory = factory;
this.adapter = adapter; this.adapter = adapter;
this.runner = factory.createRunner(adapter);
} }
@Override @Override
@ -30,7 +29,7 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{ {
final Closeable closeable = adapter.increment(); final Closeable closeable = adapter.increment();
try { try {
final Sequence<T> baseSequence = runner.run(query); final Sequence<T> baseSequence = factory.createRunner(adapter).run(query);
return new ResourceClosingSequence<T>(baseSequence, closeable); return new ResourceClosingSequence<T>(baseSequence, closeable);
} }

View File

@ -73,6 +73,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** /**
*/ */
@ -262,7 +263,7 @@ public class ServerManagerTest
) )
); );
queryNotifyLatch.await(); queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, factory.getSegmentReferences().size()); Assert.assertEquals(1, factory.getSegmentReferences().size());
@ -301,7 +302,7 @@ public class ServerManagerTest
) )
); );
queryNotifyLatch.await(); queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, factory.getSegmentReferences().size()); Assert.assertEquals(1, factory.getSegmentReferences().size());
@ -344,7 +345,7 @@ public class ServerManagerTest
) )
); );
queryNotifyLatch.await(); queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, factory.getSegmentReferences().size()); Assert.assertEquals(1, factory.getSegmentReferences().size());
@ -378,7 +379,7 @@ public class ServerManagerTest
private void waitForTestVerificationAndCleanup(Future future) private void waitForTestVerificationAndCleanup(Future future)
{ {
try { try {
queryNotifyLatch.await(); queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
queryWaitYieldLatch.countDown(); queryWaitYieldLatch.countDown();
queryWaitLatch.countDown(); queryWaitLatch.countDown();
future.get(); future.get();
@ -505,13 +506,13 @@ public class ServerManagerTest
if (!(adapter instanceof ReferenceCountingSegment)) { if (!(adapter instanceof ReferenceCountingSegment)) {
throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass()); throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass());
} }
segmentReferences.add((ReferenceCountingSegment) adapter); final ReferenceCountingSegment segment = (ReferenceCountingSegment) adapter;
adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment());
Assert.assertTrue(segment.getNumReferences() > 0);
segmentReferences.add(segment);
adapters.add((SegmentForTesting) segment.getBaseSegment());
return new BlockingQueryRunner<Result<SearchResultValue>>( return new BlockingQueryRunner<Result<SearchResultValue>>(
new NoopQueryRunner<Result<SearchResultValue>>(), new NoopQueryRunner<Result<SearchResultValue>>(), waitLatch, waitYieldLatch, notifyLatch
waitLatch,
waitYieldLatch,
notifyLatch
); );
} }
@ -702,7 +703,7 @@ public class ServerManagerTest
notifyLatch.countDown(); notifyLatch.countDown();
try { try {
waitYieldLatch.await(); waitYieldLatch.await(25, TimeUnit.MILLISECONDS);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -715,7 +716,7 @@ public class ServerManagerTest
public OutType get() public OutType get()
{ {
try { try {
waitLatch.await(); waitLatch.await(25, TimeUnit.MILLISECONDS);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);