Fix CombiningSequence.close on single element sequences. (#2969)

Regression introduced by #2892.
This commit is contained in:
Gian Merlino 2016-05-13 23:12:30 -07:00 committed by Fangjin Yang
parent 681ffdb417
commit a54381a084
2 changed files with 27 additions and 12 deletions

View File

@ -140,9 +140,7 @@ public class CombiningSequence<T> implements Sequence<T>
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
if (finalYielder != null) { yielder.close();
finalYielder.close();
}
} }
}; };
} }

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
@ -36,11 +37,14 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class CombiningSequenceTest public class CombiningSequenceTest
@ -61,7 +65,7 @@ public class CombiningSequenceTest
} }
@Test @Test
public void testMerge() throws IOException public void testMerge() throws Exception
{ {
List<Pair<Integer, Integer>> pairs = Arrays.asList( List<Pair<Integer, Integer>> pairs = Arrays.asList(
Pair.of(0, 1), Pair.of(0, 1),
@ -87,7 +91,7 @@ public class CombiningSequenceTest
} }
@Test @Test
public void testNoMergeOne() throws IOException public void testNoMergeOne() throws Exception
{ {
List<Pair<Integer, Integer>> pairs = Arrays.asList( List<Pair<Integer, Integer>> pairs = Arrays.asList(
Pair.of(0, 1) Pair.of(0, 1)
@ -101,7 +105,7 @@ public class CombiningSequenceTest
} }
@Test @Test
public void testMergeMany() throws IOException public void testMergeMany() throws Exception
{ {
List<Pair<Integer, Integer>> pairs = Arrays.asList( List<Pair<Integer, Integer>> pairs = Arrays.asList(
Pair.of(0, 6), Pair.of(0, 6),
@ -125,7 +129,7 @@ public class CombiningSequenceTest
} }
@Test @Test
public void testNoMergeTwo() throws IOException public void testNoMergeTwo() throws Exception
{ {
List<Pair<Integer, Integer>> pairs = Arrays.asList( List<Pair<Integer, Integer>> pairs = Arrays.asList(
Pair.of(0, 1), Pair.of(0, 1),
@ -141,7 +145,7 @@ public class CombiningSequenceTest
} }
@Test @Test
public void testMergeTwo() throws IOException public void testMergeTwo() throws Exception
{ {
List<Pair<Integer, Integer>> pairs = Arrays.asList( List<Pair<Integer, Integer>> pairs = Arrays.asList(
Pair.of(0, 1), Pair.of(0, 1),
@ -156,7 +160,7 @@ public class CombiningSequenceTest
} }
@Test @Test
public void testMergeSomeThingsMergedAtEnd() throws IOException public void testMergeSomeThingsMergedAtEnd() throws Exception
{ {
List<Pair<Integer, Integer>> pairs = Arrays.asList( List<Pair<Integer, Integer>> pairs = Arrays.asList(
Pair.of(0, 1), Pair.of(0, 1),
@ -193,7 +197,7 @@ public class CombiningSequenceTest
} }
private void testCombining(List<Pair<Integer, Integer>> pairs, List<Pair<Integer, Integer>> expected) private void testCombining(List<Pair<Integer, Integer>> pairs, List<Pair<Integer, Integer>> expected)
throws IOException throws Exception
{ {
for (int limit = 0; limit < expected.size() + 1; limit++) { for (int limit = 0; limit < expected.size() + 1; limit++) {
// limit = 0 doesn't work properly; it returns 1 element // limit = 0 doesn't work properly; it returns 1 element
@ -211,11 +215,22 @@ public class CombiningSequenceTest
List<Pair<Integer, Integer>> pairs, List<Pair<Integer, Integer>> pairs,
List<Pair<Integer, Integer>> expected, List<Pair<Integer, Integer>> expected,
int limit int limit
) throws IOException ) throws Exception
{ {
// Test that closing works too
final CountDownLatch closed = new CountDownLatch(1);
final Closeable closeable = new Closeable()
{
@Override
public void close() throws IOException
{
closed.countDown();
}
};
Sequence<Pair<Integer, Integer>> seq = Sequences.limit( Sequence<Pair<Integer, Integer>> seq = Sequences.limit(
new CombiningSequence<>( new CombiningSequence<>(
Sequences.simple(pairs), new ResourceClosingSequence<>(Sequences.simple(pairs), closeable),
Ordering.natural().onResultOf(Pair.<Integer, Integer>lhsFn()), Ordering.natural().onResultOf(Pair.<Integer, Integer>lhsFn()),
new BinaryFn<Pair<Integer, Integer>, Pair<Integer, Integer>, Pair<Integer, Integer>>() new BinaryFn<Pair<Integer, Integer>, Pair<Integer, Integer>, Pair<Integer, Integer>>()
{ {
@ -294,5 +309,7 @@ public class CombiningSequenceTest
Assert.assertTrue(yielder.isDone()); Assert.assertTrue(yielder.isDone());
Assert.assertFalse(expectedVals.hasNext()); Assert.assertFalse(expectedVals.hasNext());
yielder.close(); yielder.close();
Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));
} }
} }