diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java index 0feaed21782..5e7fc9e187a 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java +++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java @@ -196,7 +196,7 @@ public class ServerManager implements QuerySegmentWalker oldQueryable.close(); } catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to close segment %s", segment.getIdentifier()); + log.makeAlert(e, "Unable to close segment %s", segment.getIdentifier()).emit(); } } else { log.info( diff --git a/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java b/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java index 6a7b16efbf3..74fe1ebd180 100644 --- a/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java +++ b/server/src/main/java/com/metamx/druid/index/ReferenceCountingSegment.java @@ -54,6 +54,11 @@ public class ReferenceCountingSegment implements Segment } } + public int getNumReferences() + { + return numReferences; + } + public boolean isClosed() { return isClosed; diff --git a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java index 353679845a4..196cc1edf61 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java @@ -313,6 +313,42 @@ public class ServerManagerTest } } + @Test + public void testMultipleDrops() throws Exception + { + loadQueryable("test", "3", new Interval("2011-04-04/2011-04-05")); + + Future future = assertQueryable( + QueryGranularity.DAY, + "test", new Interval("2011-04-04/2011-04-06"), + ImmutableList.>of( + new Pair("3", new Interval("2011-04-04/2011-04-05")) + ) + ); + + queryNotifyLatch.await(); + + Assert.assertTrue(factory.getAdapters().size() == 1); + + for (SegmentForTesting segmentForTesting : factory.getAdapters()) { + Assert.assertFalse(segmentForTesting.isClosed()); + } + + dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05")); + dropQueryable("test", "3", new Interval("2011-04-04/2011-04-05")); + + for (SegmentForTesting segmentForTesting : factory.getAdapters()) { + Assert.assertFalse(segmentForTesting.isClosed()); + } + + queryWaitLatch.countDown(); + future.get(); + + for (SegmentForTesting segmentForTesting : factory.getAdapters()) { + Assert.assertTrue(segmentForTesting.isClosed()); + } + } + private void waitForTestVerificationAndCleanup(Future future) { try { diff --git a/server/src/test/java/com/metamx/druid/index/ReferenceCountingSegmentTest.java b/server/src/test/java/com/metamx/druid/index/ReferenceCountingSegmentTest.java new file mode 100644 index 00000000000..c4c6b2ecd04 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/index/ReferenceCountingSegmentTest.java @@ -0,0 +1,135 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.index; + +import com.google.common.base.Throwables; +import com.metamx.druid.StorageAdapter; +import junit.framework.Assert; +import org.joda.time.Interval; +import org.junit.Before; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + */ +public class ReferenceCountingSegmentTest +{ + private ReferenceCountingSegment segment; + private ExecutorService exec; + + @Before + public void setUp() throws Exception + { + segment = new ReferenceCountingSegment( + new Segment() + { + @Override + public String getIdentifier() + { + throw new UnsupportedOperationException(); + } + + @Override + public Interval getDataInterval() + { + throw new UnsupportedOperationException(); + } + + @Override + public QueryableIndex asQueryableIndex() + { + throw new UnsupportedOperationException(); + } + + @Override + public StorageAdapter asStorageAdapter() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException + { + } + } + ); + + exec = Executors.newSingleThreadExecutor(); + } + + @Test + public void testMultipleClose() throws Exception + { + Assert.assertFalse(segment.isClosed()); + final Closeable closeable = segment.increment(); + Assert.assertTrue(segment.getNumReferences() == 1); + + closeable.close(); + closeable.close(); + exec.submit( + new Runnable() + { + @Override + public void run() + { + try { + closeable.close(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + Assert.assertTrue(segment.getNumReferences() == 0); + Assert.assertFalse(segment.isClosed()); + + segment.close(); + segment.close(); + exec.submit( + new Runnable() + { + @Override + public void run() + { + try { + segment.close(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + + Assert.assertTrue(segment.getNumReferences() == 0); + Assert.assertTrue(segment.isClosed()); + + segment.increment(); + segment.increment(); + segment.increment(); + segment.close(); + Assert.assertTrue(segment.getNumReferences() == 0); + } +}