mirror of https://github.com/apache/druid.git
1) Add check in ServerManagerTest to make sure that the Segment has been "checked out" before the factory ever sees it.
2) Some code readability changes to ReferenceCountingSegment
This commit is contained in:
parent
a5855fb749
commit
145e08682c
|
@ -46,11 +46,11 @@ public class ReferenceCountingSegment implements Segment
|
|||
public Segment getBaseSegment()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!isClosed) {
|
||||
return baseSegment;
|
||||
if (isClosed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return null;
|
||||
return baseSegment;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -68,11 +68,11 @@ public class ReferenceCountingSegment implements Segment
|
|||
public String getIdentifier()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!isClosed) {
|
||||
return baseSegment.getIdentifier();
|
||||
if (isClosed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return null;
|
||||
return baseSegment.getIdentifier();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,11 +80,11 @@ public class ReferenceCountingSegment implements Segment
|
|||
public Interval getDataInterval()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!isClosed) {
|
||||
return baseSegment.getDataInterval();
|
||||
if (isClosed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return null;
|
||||
return baseSegment.getDataInterval();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,11 +92,11 @@ public class ReferenceCountingSegment implements Segment
|
|||
public QueryableIndex asQueryableIndex()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!isClosed) {
|
||||
return baseSegment.asQueryableIndex();
|
||||
if (isClosed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return null;
|
||||
return baseSegment.asQueryableIndex();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -104,11 +104,11 @@ public class ReferenceCountingSegment implements Segment
|
|||
public StorageAdapter asStorageAdapter()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!isClosed) {
|
||||
return baseSegment.asStorageAdapter();
|
||||
if (isClosed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return null;
|
||||
return baseSegment.asStorageAdapter();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,24 +116,18 @@ public class ReferenceCountingSegment implements Segment
|
|||
public void close() throws IOException
|
||||
{
|
||||
synchronized (lock) {
|
||||
log.info("Trying to close %s", baseSegment.getIdentifier());
|
||||
if (!isClosed) {
|
||||
if (numReferences > 0) {
|
||||
log.info(
|
||||
"%d references to %s still exist. Decrementing instead.",
|
||||
numReferences,
|
||||
baseSegment.getIdentifier()
|
||||
);
|
||||
|
||||
decrement();
|
||||
} else {
|
||||
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);
|
||||
|
||||
baseSegment.close();
|
||||
isClosed = true;
|
||||
}
|
||||
} else {
|
||||
if (isClosed) {
|
||||
log.info("Failed to close, %s is closed already", baseSegment.getIdentifier());
|
||||
return;
|
||||
}
|
||||
|
||||
if (numReferences > 0) {
|
||||
log.info("%d references to %s still exist. Decrementing.", numReferences, baseSegment.getIdentifier());
|
||||
|
||||
decrement();
|
||||
} else {
|
||||
log.info("Closing %s", baseSegment.getIdentifier());
|
||||
innerClose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -141,38 +135,50 @@ public class ReferenceCountingSegment implements Segment
|
|||
public Closeable increment()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!isClosed) {
|
||||
numReferences++;
|
||||
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
|
||||
return new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
if (decrementOnce.compareAndSet(false, true)) {
|
||||
decrement();
|
||||
}
|
||||
}
|
||||
};
|
||||
if (isClosed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return null;
|
||||
numReferences++;
|
||||
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
|
||||
return new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
if (decrementOnce.compareAndSet(false, true)) {
|
||||
decrement();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private void decrement()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!isClosed) {
|
||||
if (--numReferences < 0) {
|
||||
try {
|
||||
close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Unable to close queryable index %s", getIdentifier());
|
||||
}
|
||||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (--numReferences < 0) {
|
||||
try {
|
||||
innerClose();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Unable to close queryable index %s", getIdentifier());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void innerClose() throws IOException
|
||||
{
|
||||
synchronized (lock) {
|
||||
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);
|
||||
|
||||
isClosed = true;
|
||||
baseSegment.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -12,7 +12,7 @@ import java.io.Closeable;
|
|||
*/
|
||||
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private final QueryRunner<T> runner;
|
||||
private final QueryRunnerFactory<T, Query<T>> factory;
|
||||
private final ReferenceCountingSegment adapter;
|
||||
|
||||
public ReferenceCountingSegmentQueryRunner(
|
||||
|
@ -20,9 +20,8 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
|
|||
ReferenceCountingSegment adapter
|
||||
)
|
||||
{
|
||||
this.factory = factory;
|
||||
this.adapter = adapter;
|
||||
|
||||
this.runner = factory.createRunner(adapter);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -30,7 +29,7 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
|
|||
{
|
||||
final Closeable closeable = adapter.increment();
|
||||
try {
|
||||
final Sequence<T> baseSequence = runner.run(query);
|
||||
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query);
|
||||
|
||||
return new ResourceClosingSequence<T>(baseSequence, closeable);
|
||||
}
|
||||
|
|
|
@ -73,6 +73,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -262,7 +263,7 @@ public class ServerManagerTest
|
|||
)
|
||||
);
|
||||
|
||||
queryNotifyLatch.await();
|
||||
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.assertEquals(1, factory.getSegmentReferences().size());
|
||||
|
||||
|
@ -301,7 +302,7 @@ public class ServerManagerTest
|
|||
)
|
||||
);
|
||||
|
||||
queryNotifyLatch.await();
|
||||
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.assertEquals(1, factory.getSegmentReferences().size());
|
||||
|
||||
|
@ -344,7 +345,7 @@ public class ServerManagerTest
|
|||
)
|
||||
);
|
||||
|
||||
queryNotifyLatch.await();
|
||||
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.assertEquals(1, factory.getSegmentReferences().size());
|
||||
|
||||
|
@ -378,7 +379,7 @@ public class ServerManagerTest
|
|||
private void waitForTestVerificationAndCleanup(Future future)
|
||||
{
|
||||
try {
|
||||
queryNotifyLatch.await();
|
||||
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
|
||||
queryWaitYieldLatch.countDown();
|
||||
queryWaitLatch.countDown();
|
||||
future.get();
|
||||
|
@ -505,13 +506,13 @@ public class ServerManagerTest
|
|||
if (!(adapter instanceof ReferenceCountingSegment)) {
|
||||
throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass());
|
||||
}
|
||||
segmentReferences.add((ReferenceCountingSegment) adapter);
|
||||
adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment());
|
||||
final ReferenceCountingSegment segment = (ReferenceCountingSegment) adapter;
|
||||
|
||||
Assert.assertTrue(segment.getNumReferences() > 0);
|
||||
segmentReferences.add(segment);
|
||||
adapters.add((SegmentForTesting) segment.getBaseSegment());
|
||||
return new BlockingQueryRunner<Result<SearchResultValue>>(
|
||||
new NoopQueryRunner<Result<SearchResultValue>>(),
|
||||
waitLatch,
|
||||
waitYieldLatch,
|
||||
notifyLatch
|
||||
new NoopQueryRunner<Result<SearchResultValue>>(), waitLatch, waitYieldLatch, notifyLatch
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -702,7 +703,7 @@ public class ServerManagerTest
|
|||
notifyLatch.countDown();
|
||||
|
||||
try {
|
||||
waitYieldLatch.await();
|
||||
waitYieldLatch.await(25, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -715,7 +716,7 @@ public class ServerManagerTest
|
|||
public OutType get()
|
||||
{
|
||||
try {
|
||||
waitLatch.await();
|
||||
waitLatch.await(25, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
Loading…
Reference in New Issue