From 145e08682cba02fdbaf9ad71763542f3ee8174da Mon Sep 17 00:00:00 2001 From: cheddar Date: Tue, 13 Aug 2013 17:01:24 -0700 Subject: [PATCH] 1) Add check in ServerManagerTest to make sure that the Segment has been "checked out" before the factory ever sees it. 2) Some code readability changes to ReferenceCountingSegment --- .../druid/index/ReferenceCountingSegment.java | 114 +++++++++--------- .../ReferenceCountingSegmentQueryRunner.java | 7 +- .../druid/coordination/ServerManagerTest.java | 25 ++-- 3 files changed, 76 insertions(+), 70 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java b/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java index 1460f2d8634..967537a1dbb 100644 --- a/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java +++ b/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java @@ -46,11 +46,11 @@ public class ReferenceCountingSegment implements Segment public Segment getBaseSegment() { synchronized (lock) { - if (!isClosed) { - return baseSegment; + if (isClosed) { + return null; } - return null; + return baseSegment; } } @@ -68,11 +68,11 @@ public class ReferenceCountingSegment implements Segment public String getIdentifier() { synchronized (lock) { - if (!isClosed) { - return baseSegment.getIdentifier(); + if (isClosed) { + return null; } - return null; + return baseSegment.getIdentifier(); } } @@ -80,11 +80,11 @@ public class ReferenceCountingSegment implements Segment public Interval getDataInterval() { synchronized (lock) { - if (!isClosed) { - return baseSegment.getDataInterval(); + if (isClosed) { + return null; } - return null; + return baseSegment.getDataInterval(); } } @@ -92,11 +92,11 @@ public class ReferenceCountingSegment implements Segment public QueryableIndex asQueryableIndex() { synchronized (lock) { - if (!isClosed) { - return baseSegment.asQueryableIndex(); + if (isClosed) { + return null; } - return null; + return baseSegment.asQueryableIndex(); } } @@ -104,11 +104,11 @@ public class ReferenceCountingSegment implements Segment public StorageAdapter asStorageAdapter() { synchronized (lock) { - if (!isClosed) { - return baseSegment.asStorageAdapter(); + if (isClosed) { + return null; } - return null; + return baseSegment.asStorageAdapter(); } } @@ -116,24 +116,18 @@ public class ReferenceCountingSegment implements Segment public void close() throws IOException { synchronized (lock) { - log.info("Trying to close %s", baseSegment.getIdentifier()); - if (!isClosed) { - if (numReferences > 0) { - log.info( - "%d references to %s still exist. Decrementing instead.", - numReferences, - baseSegment.getIdentifier() - ); - - decrement(); - } else { - log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences); - - baseSegment.close(); - isClosed = true; - } - } else { + if (isClosed) { log.info("Failed to close, %s is closed already", baseSegment.getIdentifier()); + return; + } + + if (numReferences > 0) { + log.info("%d references to %s still exist. Decrementing.", numReferences, baseSegment.getIdentifier()); + + decrement(); + } else { + log.info("Closing %s", baseSegment.getIdentifier()); + innerClose(); } } } @@ -141,38 +135,50 @@ public class ReferenceCountingSegment implements Segment public Closeable increment() { synchronized (lock) { - if (!isClosed) { - numReferences++; - final AtomicBoolean decrementOnce = new AtomicBoolean(false); - return new Closeable() - { - @Override - public void close() throws IOException - { - if (decrementOnce.compareAndSet(false, true)) { - decrement(); - } - } - }; + if (isClosed) { + return null; } - return null; + numReferences++; + final AtomicBoolean decrementOnce = new AtomicBoolean(false); + return new Closeable() + { + @Override + public void close() throws IOException + { + if (decrementOnce.compareAndSet(false, true)) { + decrement(); + } + } + }; } } private void decrement() { synchronized (lock) { - if (!isClosed) { - if (--numReferences < 0) { - try { - close(); - } - catch (Exception e) { - log.error("Unable to close queryable index %s", getIdentifier()); - } + if (isClosed) { + return; + } + + if (--numReferences < 0) { + try { + innerClose(); + } + catch (Exception e) { + log.error("Unable to close queryable index %s", getIdentifier()); } } } } + + private void innerClose() throws IOException + { + synchronized (lock) { + log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences); + + isClosed = true; + baseSegment.close(); + } + } } \ No newline at end of file diff --git a/server/src/main/java/com/metamx/druid/query/ReferenceCountingSegmentQueryRunner.java b/server/src/main/java/com/metamx/druid/query/ReferenceCountingSegmentQueryRunner.java index afcc967e054..8b2879dd493 100644 --- a/server/src/main/java/com/metamx/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/server/src/main/java/com/metamx/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -12,7 +12,7 @@ import java.io.Closeable; */ public class ReferenceCountingSegmentQueryRunner implements QueryRunner { - private final QueryRunner runner; + private final QueryRunnerFactory> factory; private final ReferenceCountingSegment adapter; public ReferenceCountingSegmentQueryRunner( @@ -20,9 +20,8 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner ReferenceCountingSegment adapter ) { + this.factory = factory; this.adapter = adapter; - - this.runner = factory.createRunner(adapter); } @Override @@ -30,7 +29,7 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner { final Closeable closeable = adapter.increment(); try { - final Sequence baseSequence = runner.run(query); + final Sequence baseSequence = factory.createRunner(adapter).run(query); return new ResourceClosingSequence(baseSequence, closeable); } diff --git a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java index adb9cb0c88f..f2cd54f4551 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java @@ -73,6 +73,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** */ @@ -262,7 +263,7 @@ public class ServerManagerTest ) ); - queryNotifyLatch.await(); + queryNotifyLatch.await(25, TimeUnit.MILLISECONDS); Assert.assertEquals(1, factory.getSegmentReferences().size()); @@ -301,7 +302,7 @@ public class ServerManagerTest ) ); - queryNotifyLatch.await(); + queryNotifyLatch.await(25, TimeUnit.MILLISECONDS); Assert.assertEquals(1, factory.getSegmentReferences().size()); @@ -344,7 +345,7 @@ public class ServerManagerTest ) ); - queryNotifyLatch.await(); + queryNotifyLatch.await(25, TimeUnit.MILLISECONDS); Assert.assertEquals(1, factory.getSegmentReferences().size()); @@ -378,7 +379,7 @@ public class ServerManagerTest private void waitForTestVerificationAndCleanup(Future future) { try { - queryNotifyLatch.await(); + queryNotifyLatch.await(25, TimeUnit.MILLISECONDS); queryWaitYieldLatch.countDown(); queryWaitLatch.countDown(); future.get(); @@ -505,13 +506,13 @@ public class ServerManagerTest if (!(adapter instanceof ReferenceCountingSegment)) { throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass()); } - segmentReferences.add((ReferenceCountingSegment) adapter); - adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment()); + final ReferenceCountingSegment segment = (ReferenceCountingSegment) adapter; + + Assert.assertTrue(segment.getNumReferences() > 0); + segmentReferences.add(segment); + adapters.add((SegmentForTesting) segment.getBaseSegment()); return new BlockingQueryRunner>( - new NoopQueryRunner>(), - waitLatch, - waitYieldLatch, - notifyLatch + new NoopQueryRunner>(), waitLatch, waitYieldLatch, notifyLatch ); } @@ -702,7 +703,7 @@ public class ServerManagerTest notifyLatch.countDown(); try { - waitYieldLatch.await(); + waitYieldLatch.await(25, TimeUnit.MILLISECONDS); } catch (Exception e) { throw Throwables.propagate(e); @@ -715,7 +716,7 @@ public class ServerManagerTest public OutType get() { try { - waitLatch.await(); + waitLatch.await(25, TimeUnit.MILLISECONDS); } catch (Exception e) { throw Throwables.propagate(e);