pair) {
+ Compressor compressor = pair.compressor;
+
+ if (compressor.getClass().isAssignableFrom(Lz4Compressor.class)
+ && (NativeCodeLoader.isNativeCodeLoaded()))
+ return true;
+
+ else if (compressor.getClass().isAssignableFrom(BuiltInZlibDeflater.class)
+ && NativeCodeLoader.isNativeCodeLoaded())
+ return true;
+
+ else if (compressor.getClass().isAssignableFrom(ZlibCompressor.class)) {
+ return ZlibFactory.isNativeZlibLoaded(new Configuration());
+ }
+ else if (compressor.getClass().isAssignableFrom(SnappyCompressor.class)
+ && isNativeSnappyLoadable())
+ return true;
+
+ return false;
+ }
+
+ abstract static class TesterCompressionStrategy {
+
+ protected final Logger logger = Logger.getLogger(getClass());
+
+ abstract void assertCompression(String name, Compressor compressor,
+ Decompressor decompressor, byte[] originalRawData);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
new file mode 100644
index 00000000000..551f282889e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCodecPool {
+ private final String LEASE_COUNT_ERR =
+ "Incorrect number of leased (de)compressors";
+ DefaultCodec codec;
+
+ @Before
+ public void setup() {
+ this.codec = new DefaultCodec();
+ this.codec.setConf(new Configuration());
+ }
+
+ @Test(timeout = 1000)
+ public void testCompressorPoolCounts() {
+ // Get two compressors and return them
+ Compressor comp1 = CodecPool.getCompressor(codec);
+ Compressor comp2 = CodecPool.getCompressor(codec);
+ assertEquals(LEASE_COUNT_ERR, 2,
+ CodecPool.getLeasedCompressorsCount(codec));
+
+ CodecPool.returnCompressor(comp2);
+ assertEquals(LEASE_COUNT_ERR, 1,
+ CodecPool.getLeasedCompressorsCount(codec));
+
+ CodecPool.returnCompressor(comp1);
+ assertEquals(LEASE_COUNT_ERR, 0,
+ CodecPool.getLeasedCompressorsCount(codec));
+ }
+
+ @Test(timeout = 1000)
+ public void testDecompressorPoolCounts() {
+ // Get two decompressors and return them
+ Decompressor decomp1 = CodecPool.getDecompressor(codec);
+ Decompressor decomp2 = CodecPool.getDecompressor(codec);
+ assertEquals(LEASE_COUNT_ERR, 2,
+ CodecPool.getLeasedDecompressorsCount(codec));
+
+ CodecPool.returnDecompressor(decomp2);
+ assertEquals(LEASE_COUNT_ERR, 1,
+ CodecPool.getLeasedDecompressorsCount(codec));
+
+ CodecPool.returnDecompressor(decomp1);
+ assertEquals(LEASE_COUNT_ERR, 0,
+ CodecPool.getLeasedDecompressorsCount(codec));
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java
new file mode 100644
index 00000000000..a8ac993c47b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressorDecompressor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import static org.junit.Assert.fail;
+import java.util.Random;
+import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
+import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
+import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
+import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Test for pairs:
+ *
+ * SnappyCompressor/SnappyDecompressor
+ * Lz4Compressor/Lz4Decompressor
+ * BuiltInZlibDeflater/new BuiltInZlibInflater
+ *
+ *
+ * Note: we can't use ZlibCompressor/ZlibDecompressor here
+ * because his constructor can throw exception (if native libraries not found)
+ * For ZlibCompressor/ZlibDecompressor pair testing used {@code TestZlibCompressorDecompressor}
+ *
+ *
+ *
+ */
+public class TestCompressorDecompressor {
+
+ private static final Random rnd = new Random(12345L);
+
+ @Test
+ public void testCompressorDecompressor() {
+ // no more for this data
+ int SIZE = 44 * 1024;
+
+ byte[] rawData = generate(SIZE);
+ try {
+ CompressDecompressTester.of(rawData)
+ .withCompressDecompressPair(new SnappyCompressor(), new SnappyDecompressor())
+ .withCompressDecompressPair(new Lz4Compressor(), new Lz4Decompressor())
+ .withCompressDecompressPair(new BuiltInZlibDeflater(), new BuiltInZlibInflater())
+ .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+ .test();
+
+ } catch (Exception ex) {
+ fail("testCompressorDecompressor error !!!" + ex);
+ }
+ }
+
+ @Test
+ public void testCompressorDecompressorWithExeedBufferLimit() {
+ int BYTE_SIZE = 100 * 1024;
+ byte[] rawData = generate(BYTE_SIZE);
+ try {
+ CompressDecompressTester.of(rawData)
+ .withCompressDecompressPair(
+ new SnappyCompressor(BYTE_SIZE + BYTE_SIZE / 2),
+ new SnappyDecompressor(BYTE_SIZE + BYTE_SIZE / 2))
+ .withCompressDecompressPair(new Lz4Compressor(BYTE_SIZE),
+ new Lz4Decompressor(BYTE_SIZE))
+ .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+ .test();
+
+ } catch (Exception ex) {
+ fail("testCompressorDecompressorWithExeedBufferLimit error !!!" + ex);
+ }
+ }
+
+ public static byte[] generate(int size) {
+ byte[] array = new byte[size];
+ for (int i = 0; i < size; i++)
+ array[i] = (byte) rnd.nextInt(16);
+ return array;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/lz4/TestLz4CompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/lz4/TestLz4CompressorDecompressor.java
new file mode 100644
index 00000000000..e8555b23887
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/lz4/TestLz4CompressorDecompressor.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.lz4;
+
+import static org.junit.Assert.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
+import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assume.*;
+
+public class TestLz4CompressorDecompressor {
+
+ private static final Random rnd = new Random(12345l);
+
+ @Before
+ public void before() {
+ assumeTrue(Lz4Codec.isNativeCodeLoaded());
+ }
+
+ //test on NullPointerException in {@code compressor.setInput()}
+ @Test
+ public void testCompressorSetInputNullPointerException() {
+ try {
+ Lz4Compressor compressor = new Lz4Compressor();
+ compressor.setInput(null, 0, 10);
+ fail("testCompressorSetInputNullPointerException error !!!");
+ } catch (NullPointerException ex) {
+ // expected
+ } catch (Exception e) {
+ fail("testCompressorSetInputNullPointerException ex error !!!");
+ }
+ }
+
+ //test on NullPointerException in {@code decompressor.setInput()}
+ @Test
+ public void testDecompressorSetInputNullPointerException() {
+ try {
+ Lz4Decompressor decompressor = new Lz4Decompressor();
+ decompressor.setInput(null, 0, 10);
+ fail("testDecompressorSetInputNullPointerException error !!!");
+ } catch (NullPointerException ex) {
+ // expected
+ } catch (Exception e) {
+ fail("testDecompressorSetInputNullPointerException ex error !!!");
+ }
+ }
+
+ //test on ArrayIndexOutOfBoundsException in {@code compressor.setInput()}
+ @Test
+ public void testCompressorSetInputAIOBException() {
+ try {
+ Lz4Compressor compressor = new Lz4Compressor();
+ compressor.setInput(new byte[] {}, -5, 10);
+ fail("testCompressorSetInputAIOBException error !!!");
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ // expected
+ } catch (Exception ex) {
+ fail("testCompressorSetInputAIOBException ex error !!!");
+ }
+ }
+
+ //test on ArrayIndexOutOfBoundsException in {@code decompressor.setInput()}
+ @Test
+ public void testDecompressorSetInputAIOUBException() {
+ try {
+ Lz4Decompressor decompressor = new Lz4Decompressor();
+ decompressor.setInput(new byte[] {}, -5, 10);
+ fail("testDecompressorSetInputAIOBException error !!!");
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ // expected
+ } catch (Exception e) {
+ fail("testDecompressorSetInputAIOBException ex error !!!");
+ }
+ }
+
+ //test on NullPointerException in {@code compressor.compress()}
+ @Test
+ public void testCompressorCompressNullPointerException() {
+ try {
+ Lz4Compressor compressor = new Lz4Compressor();
+ byte[] bytes = generate(1024 * 6);
+ compressor.setInput(bytes, 0, bytes.length);
+ compressor.compress(null, 0, 0);
+ fail("testCompressorCompressNullPointerException error !!!");
+ } catch (NullPointerException ex) {
+ // expected
+ } catch (Exception e) {
+ fail("testCompressorCompressNullPointerException ex error !!!");
+ }
+ }
+
+ //test on NullPointerException in {@code decompressor.decompress()}
+ @Test
+ public void testDecompressorCompressNullPointerException() {
+ try {
+ Lz4Decompressor decompressor = new Lz4Decompressor();
+ byte[] bytes = generate(1024 * 6);
+ decompressor.setInput(bytes, 0, bytes.length);
+ decompressor.decompress(null, 0, 0);
+ fail("testDecompressorCompressNullPointerException error !!!");
+ } catch (NullPointerException ex) {
+ // expected
+ } catch (Exception e) {
+ fail("testDecompressorCompressNullPointerException ex error !!!");
+ }
+ }
+
+ //test on ArrayIndexOutOfBoundsException in {@code compressor.compress()}
+ @Test
+ public void testCompressorCompressAIOBException() {
+ try {
+ Lz4Compressor compressor = new Lz4Compressor();
+ byte[] bytes = generate(1024 * 6);
+ compressor.setInput(bytes, 0, bytes.length);
+ compressor.compress(new byte[] {}, 0, -1);
+ fail("testCompressorCompressAIOBException error !!!");
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ // expected
+ } catch (Exception e) {
+ fail("testCompressorCompressAIOBException ex error !!!");
+ }
+ }
+
+ //test on ArrayIndexOutOfBoundsException in decompressor.decompress()
+ @Test
+ public void testDecompressorCompressAIOBException() {
+ try {
+ Lz4Decompressor decompressor = new Lz4Decompressor();
+ byte[] bytes = generate(1024 * 6);
+ decompressor.setInput(bytes, 0, bytes.length);
+ decompressor.decompress(new byte[] {}, 0, -1);
+ fail("testDecompressorCompressAIOBException error !!!");
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ // expected
+ } catch (Exception e) {
+ fail("testDecompressorCompressAIOBException ex error !!!");
+ }
+ }
+
+ // test Lz4Compressor compressor.compress()
+ @Test
+ public void testSetInputWithBytesSizeMoreThenDefaultLz4CompressorByfferSize() {
+ int BYTES_SIZE = 1024 * 64 + 1;
+ try {
+ Lz4Compressor compressor = new Lz4Compressor();
+ byte[] bytes = generate(BYTES_SIZE);
+ assertTrue("needsInput error !!!", compressor.needsInput());
+ compressor.setInput(bytes, 0, bytes.length);
+ byte[] emptyBytes = new byte[BYTES_SIZE];
+ int csize = compressor.compress(emptyBytes, 0, bytes.length);
+ assertTrue(
+ "testSetInputWithBytesSizeMoreThenDefaultLz4CompressorByfferSize error !!!",
+ csize != 0);
+ } catch (Exception ex) {
+ fail("testSetInputWithBytesSizeMoreThenDefaultLz4CompressorByfferSize ex error !!!");
+ }
+ }
+
+ // test compress/decompress process
+ @Test
+ public void testCompressDecompress() {
+ int BYTE_SIZE = 1024 * 54;
+ byte[] bytes = generate(BYTE_SIZE);
+ Lz4Compressor compressor = new Lz4Compressor();
+ try {
+ compressor.setInput(bytes, 0, bytes.length);
+ assertTrue("Lz4CompressDecompress getBytesRead error !!!",
+ compressor.getBytesRead() > 0);
+ assertTrue(
+ "Lz4CompressDecompress getBytesWritten before compress error !!!",
+ compressor.getBytesWritten() == 0);
+
+ byte[] compressed = new byte[BYTE_SIZE];
+ int cSize = compressor.compress(compressed, 0, compressed.length);
+ assertTrue(
+ "Lz4CompressDecompress getBytesWritten after compress error !!!",
+ compressor.getBytesWritten() > 0);
+ Lz4Decompressor decompressor = new Lz4Decompressor();
+ // set as input for decompressor only compressed data indicated with cSize
+ decompressor.setInput(compressed, 0, cSize);
+ byte[] decompressed = new byte[BYTE_SIZE];
+ decompressor.decompress(decompressed, 0, decompressed.length);
+
+ assertTrue("testLz4CompressDecompress finished error !!!", decompressor.finished());
+ assertArrayEquals(bytes, decompressed);
+ compressor.reset();
+ decompressor.reset();
+ assertTrue("decompressor getRemaining error !!!",decompressor.getRemaining() == 0);
+ } catch (Exception e) {
+ fail("testLz4CompressDecompress ex error!!!");
+ }
+ }
+
+ // test compress/decompress with empty stream
+ @Test
+ public void testCompressorDecompressorEmptyStreamLogic() {
+ ByteArrayInputStream bytesIn = null;
+ ByteArrayOutputStream bytesOut = null;
+ byte[] buf = null;
+ BlockDecompressorStream blockDecompressorStream = null;
+ try {
+ // compress empty stream
+ bytesOut = new ByteArrayOutputStream();
+ BlockCompressorStream blockCompressorStream = new BlockCompressorStream(
+ bytesOut, new Lz4Compressor(), 1024, 0);
+ // close without write
+ blockCompressorStream.close();
+ // check compressed output
+ buf = bytesOut.toByteArray();
+ assertEquals("empty stream compressed output size != 4", 4, buf.length);
+ // use compressed output as input for decompression
+ bytesIn = new ByteArrayInputStream(buf);
+ // create decompression stream
+ blockDecompressorStream = new BlockDecompressorStream(bytesIn,
+ new Lz4Decompressor(), 1024);
+ // no byte is available because stream was closed
+ assertEquals("return value is not -1", -1, blockDecompressorStream.read());
+ } catch (Exception e) {
+ fail("testCompressorDecompressorEmptyStreamLogic ex error !!!"
+ + e.getMessage());
+ } finally {
+ if (blockDecompressorStream != null)
+ try {
+ bytesIn.close();
+ bytesOut.close();
+ blockDecompressorStream.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ // test compress/decompress process through CompressionOutputStream/CompressionInputStream api
+ @Test
+ public void testCompressorDecopressorLogicWithCompressionStreams() {
+ DataOutputStream deflateOut = null;
+ DataInputStream inflateIn = null;
+ int BYTE_SIZE = 1024 * 100;
+ byte[] bytes = generate(BYTE_SIZE);
+ int bufferSize = 262144;
+ int compressionOverhead = (bufferSize / 6) + 32;
+ try {
+ DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+ CompressionOutputStream deflateFilter = new BlockCompressorStream(
+ compressedDataBuffer, new Lz4Compressor(bufferSize), bufferSize,
+ compressionOverhead);
+ deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+ deflateOut.write(bytes, 0, bytes.length);
+ deflateOut.flush();
+ deflateFilter.finish();
+
+ DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+ deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
+ compressedDataBuffer.getLength());
+
+ CompressionInputStream inflateFilter = new BlockDecompressorStream(
+ deCompressedDataBuffer, new Lz4Decompressor(bufferSize), bufferSize);
+
+ inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
+
+ byte[] result = new byte[BYTE_SIZE];
+ inflateIn.read(result);
+
+ assertArrayEquals("original array not equals compress/decompressed array", result,
+ bytes);
+ } catch (IOException e) {
+ fail("testLz4CompressorDecopressorLogicWithCompressionStreams ex error !!!");
+ } finally {
+ try {
+ if (deflateOut != null)
+ deflateOut.close();
+ if (inflateIn != null)
+ inflateIn.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ public static byte[] generate(int size) {
+ byte[] array = new byte[size];
+ for (int i = 0; i < size; i++)
+ array[i] = (byte)rnd.nextInt(16);
+ return array;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
new file mode 100644
index 00000000000..6e792d1e4ea
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.zlib;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressDecompressTester;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DecompressorStream;
+import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
+import org.junit.Before;
+import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+
+public class TestZlibCompressorDecompressor {
+
+ private static final Random random = new Random(12345L);
+
+ @Before
+ public void before() {
+ assumeTrue(ZlibFactory.isNativeZlibLoaded(new Configuration()));
+ }
+
+ @Test
+ public void testZlibCompressorDecompressor() {
+ try {
+ int SIZE = 44 * 1024;
+ byte[] rawData = generate(SIZE);
+
+ CompressDecompressTester.of(rawData)
+ .withCompressDecompressPair(new ZlibCompressor(), new ZlibDecompressor())
+ .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+ .test();
+ } catch (Exception ex) {
+ fail("testCompressorDecompressor error !!!" + ex);
+ }
+ }
+
+ @Test
+ public void testCompressorDecompressorWithExeedBufferLimit() {
+ int BYTE_SIZE = 100 * 1024;
+ byte[] rawData = generate(BYTE_SIZE);
+ try {
+ CompressDecompressTester.of(rawData)
+ .withCompressDecompressPair(
+ new ZlibCompressor(
+ org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel.BEST_COMPRESSION,
+ CompressionStrategy.DEFAULT_STRATEGY,
+ org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionHeader.DEFAULT_HEADER,
+ BYTE_SIZE),
+ new ZlibDecompressor(
+ org.apache.hadoop.io.compress.zlib.ZlibDecompressor.CompressionHeader.DEFAULT_HEADER,
+ BYTE_SIZE))
+ .withTestCases(ImmutableSet.of(CompressionTestStrategy.COMPRESS_DECOMPRESS_SINGLE_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_BLOCK,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_ERRORS,
+ CompressionTestStrategy.COMPRESS_DECOMPRESS_WITH_EMPTY_STREAM))
+ .test();
+ } catch (Exception ex) {
+ fail("testCompressorDecompressorWithExeedBufferLimit error !!!" + ex);
+ }
+ }
+
+
+ @Test
+ public void testZlibCompressorDecompressorWithConfiguration() {
+ Configuration conf = new Configuration();
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
+ if (ZlibFactory.isNativeZlibLoaded(conf)) {
+ byte[] rawData;
+ int tryNumber = 5;
+ int BYTE_SIZE = 10 * 1024;
+ Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
+ Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
+ rawData = generate(BYTE_SIZE);
+ try {
+ for (int i = 0; i < tryNumber; i++)
+ compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor,
+ (ZlibDecompressor) zlibDecompressor);
+ zlibCompressor.reinit(conf);
+ } catch (Exception ex) {
+ fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex);
+ }
+ } else {
+ assertTrue("ZlibFactory is using native libs against request",
+ ZlibFactory.isNativeZlibLoaded(conf));
+ }
+ }
+
+ @Test
+ public void testZlibCompressDecompress() {
+ byte[] rawData = null;
+ int rawDataSize = 0;
+ rawDataSize = 1024 * 64;
+ rawData = generate(rawDataSize);
+ try {
+ ZlibCompressor compressor = new ZlibCompressor();
+ ZlibDecompressor decompressor = new ZlibDecompressor();
+ assertFalse("testZlibCompressDecompress finished error",
+ compressor.finished());
+ compressor.setInput(rawData, 0, rawData.length);
+ assertTrue("testZlibCompressDecompress getBytesRead before error",
+ compressor.getBytesRead() == 0);
+ compressor.finish();
+
+ byte[] compressedResult = new byte[rawDataSize];
+ int cSize = compressor.compress(compressedResult, 0, rawDataSize);
+ assertTrue("testZlibCompressDecompress getBytesRead ather error",
+ compressor.getBytesRead() == rawDataSize);
+ assertTrue(
+ "testZlibCompressDecompress compressed size no less then original size",
+ cSize < rawDataSize);
+ decompressor.setInput(compressedResult, 0, cSize);
+ byte[] decompressedBytes = new byte[rawDataSize];
+ decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
+ assertArrayEquals("testZlibCompressDecompress arrays not equals ",
+ rawData, decompressedBytes);
+ compressor.reset();
+ decompressor.reset();
+ } catch (IOException ex) {
+ fail("testZlibCompressDecompress ex !!!" + ex);
+ }
+ }
+
+ @Test
+ public void testZlibCompressorDecompressorSetDictionary() {
+ Configuration conf = new Configuration();
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
+ if (ZlibFactory.isNativeZlibLoaded(conf)) {
+ Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
+ Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
+
+ checkSetDictionaryNullPointerException(zlibCompressor);
+ checkSetDictionaryNullPointerException(zlibDecompressor);
+
+ checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor);
+ checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor);
+ } else {
+ assertTrue("ZlibFactory is using native libs against request",
+ ZlibFactory.isNativeZlibLoaded(conf));
+ }
+ }
+
+ @Test
+ public void testZlibFactory() {
+ Configuration cfg = new Configuration();
+
+ assertTrue("testZlibFactory compression level error !!!",
+ CompressionLevel.DEFAULT_COMPRESSION == ZlibFactory
+ .getCompressionLevel(cfg));
+
+ assertTrue("testZlibFactory compression strategy error !!!",
+ CompressionStrategy.DEFAULT_STRATEGY == ZlibFactory
+ .getCompressionStrategy(cfg));
+
+ ZlibFactory.setCompressionLevel(cfg, CompressionLevel.BEST_COMPRESSION);
+ assertTrue("testZlibFactory compression strategy error !!!",
+ CompressionLevel.BEST_COMPRESSION == ZlibFactory
+ .getCompressionLevel(cfg));
+
+ ZlibFactory.setCompressionStrategy(cfg, CompressionStrategy.FILTERED);
+ assertTrue("testZlibFactory compression strategy error !!!",
+ CompressionStrategy.FILTERED == ZlibFactory.getCompressionStrategy(cfg));
+ }
+
+
+ private boolean checkSetDictionaryNullPointerException(
+ Decompressor decompressor) {
+ try {
+ decompressor.setDictionary(null, 0, 1);
+ } catch (NullPointerException ex) {
+ return true;
+ } catch (Exception ex) {
+ }
+ return false;
+ }
+
+ private boolean checkSetDictionaryNullPointerException(Compressor compressor) {
+ try {
+ compressor.setDictionary(null, 0, 1);
+ } catch (NullPointerException ex) {
+ return true;
+ } catch (Exception ex) {
+ }
+ return false;
+ }
+
+ private boolean checkSetDictionaryArrayIndexOutOfBoundsException(
+ Compressor compressor) {
+ try {
+ compressor.setDictionary(new byte[] { (byte) 0 }, 0, -1);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ return true;
+ } catch (Exception e) {
+ }
+ return false;
+ }
+
+ private boolean checkSetDictionaryArrayIndexOutOfBoundsException(
+ Decompressor decompressor) {
+ try {
+ decompressor.setDictionary(new byte[] { (byte) 0 }, 0, -1);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ return true;
+ } catch (Exception e) {
+ }
+ return false;
+ }
+
+ private byte[] compressDecompressZlib(byte[] rawData,
+ ZlibCompressor zlibCompressor, ZlibDecompressor zlibDecompressor)
+ throws IOException {
+ int cSize = 0;
+ byte[] compressedByte = new byte[rawData.length];
+ byte[] decompressedRawData = new byte[rawData.length];
+ zlibCompressor.setInput(rawData, 0, rawData.length);
+ zlibCompressor.finish();
+ while (!zlibCompressor.finished()) {
+ cSize = zlibCompressor.compress(compressedByte, 0, compressedByte.length);
+ }
+ zlibCompressor.reset();
+
+ assertTrue(zlibDecompressor.getBytesWritten() == 0);
+ assertTrue(zlibDecompressor.getBytesRead() == 0);
+ assertTrue(zlibDecompressor.needsInput());
+ zlibDecompressor.setInput(compressedByte, 0, cSize);
+ assertFalse(zlibDecompressor.needsInput());
+ while (!zlibDecompressor.finished()) {
+ zlibDecompressor.decompress(decompressedRawData, 0,
+ decompressedRawData.length);
+ }
+ assertTrue(zlibDecompressor.getBytesWritten() == rawData.length);
+ assertTrue(zlibDecompressor.getBytesRead() == cSize);
+ zlibDecompressor.reset();
+ assertTrue(zlibDecompressor.getRemaining() == 0);
+ assertArrayEquals(
+ "testZlibCompressorDecompressorWithConfiguration array equals error",
+ rawData, decompressedRawData);
+
+ return decompressedRawData;
+ }
+
+ @Test
+ public void testBuiltInGzipDecompressorExceptions() {
+ BuiltInGzipDecompressor decompresser = new BuiltInGzipDecompressor();
+ try {
+ decompresser.setInput(null, 0, 1);
+ } catch (NullPointerException ex) {
+ // expected
+ } catch (Exception ex) {
+ fail("testBuiltInGzipDecompressorExceptions npe error " + ex);
+ }
+
+ try {
+ decompresser.setInput(new byte[] { 0 }, 0, -1);
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ // expected
+ } catch (Exception ex) {
+ fail("testBuiltInGzipDecompressorExceptions aioob error" + ex);
+ }
+
+ assertTrue("decompresser.getBytesRead error",
+ decompresser.getBytesRead() == 0);
+ assertTrue("decompresser.getRemaining error",
+ decompresser.getRemaining() == 0);
+ decompresser.reset();
+ decompresser.end();
+
+ InputStream decompStream = null;
+ try {
+ // invalid 0 and 1 bytes , must be 31, -117
+ int buffSize = 1 * 1024;
+ byte buffer[] = new byte[buffSize];
+ Decompressor decompressor = new BuiltInGzipDecompressor();
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ decompStream = new DecompressorStream(gzbuf, decompressor);
+ gzbuf.reset(new byte[] { 0, 0, 1, 1, 1, 1, 11, 1, 1, 1, 1 }, 11);
+ decompStream.read(buffer);
+ } catch (IOException ioex) {
+ // expected
+ } catch (Exception ex) {
+ fail("invalid 0 and 1 byte in gzip stream" + ex);
+ }
+
+ // invalid 2 byte, must be 8
+ try {
+ int buffSize = 1 * 1024;
+ byte buffer[] = new byte[buffSize];
+ Decompressor decompressor = new BuiltInGzipDecompressor();
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ decompStream = new DecompressorStream(gzbuf, decompressor);
+ gzbuf.reset(new byte[] { 31, -117, 7, 1, 1, 1, 1, 11, 1, 1, 1, 1 }, 11);
+ decompStream.read(buffer);
+ } catch (IOException ioex) {
+ // expected
+ } catch (Exception ex) {
+ fail("invalid 2 byte in gzip stream" + ex);
+ }
+
+ try {
+ int buffSize = 1 * 1024;
+ byte buffer[] = new byte[buffSize];
+ Decompressor decompressor = new BuiltInGzipDecompressor();
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ decompStream = new DecompressorStream(gzbuf, decompressor);
+ gzbuf.reset(new byte[] { 31, -117, 8, -32, 1, 1, 1, 11, 1, 1, 1, 1 }, 11);
+ decompStream.read(buffer);
+ } catch (IOException ioex) {
+ // expected
+ } catch (Exception ex) {
+ fail("invalid 3 byte in gzip stream" + ex);
+ }
+ try {
+ int buffSize = 1 * 1024;
+ byte buffer[] = new byte[buffSize];
+ Decompressor decompressor = new BuiltInGzipDecompressor();
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ decompStream = new DecompressorStream(gzbuf, decompressor);
+ gzbuf.reset(new byte[] { 31, -117, 8, 4, 1, 1, 1, 11, 1, 1, 1, 1 }, 11);
+ decompStream.read(buffer);
+ } catch (IOException ioex) {
+ // expected
+ } catch (Exception ex) {
+ fail("invalid 3 byte make hasExtraField" + ex);
+ }
+ }
+
+ public static byte[] generate(int size) {
+ byte[] data = new byte[size];
+ for (int i = 0; i < size; i++)
+ data[i] = (byte)random.nextInt(16);
+ return data;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
index 0602d302720..f5fc49dbde6 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
@@ -446,7 +446,13 @@ public class TestNativeIO {
NativeIO.renameTo(nonExistentFile, targetFile);
Assert.fail();
} catch (NativeIOException e) {
- Assert.assertEquals(e.getErrno(), Errno.ENOENT);
+ if (Path.WINDOWS) {
+ Assert.assertEquals(
+ String.format("The system cannot find the file specified.%n"),
+ e.getMessage());
+ } else {
+ Assert.assertEquals(Errno.ENOENT, e.getErrno());
+ }
}
// Test renaming a file to itself. It should succeed and do nothing.
@@ -465,7 +471,13 @@ public class TestNativeIO {
NativeIO.renameTo(sourceFile, badTarget);
Assert.fail();
} catch (NativeIOException e) {
- Assert.assertEquals(e.getErrno(), Errno.ENOTDIR);
+ if (Path.WINDOWS) {
+ Assert.assertEquals(
+ String.format("The parameter is incorrect.%n"),
+ e.getMessage());
+ } else {
+ Assert.assertEquals(Errno.ENOTDIR, e.getErrno());
+ }
}
FileUtils.deleteQuietly(TEST_DIR);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java
index 29849605e0f..2bdfdb978a9 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java
@@ -24,7 +24,9 @@ import java.util.List;
import org.apache.commons.configuration.SubsetConfiguration;
import org.junit.Test;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.impl.ConfigBuilder;
import static org.apache.hadoop.metrics2.lib.Interns.*;
@@ -38,6 +40,8 @@ public class TestPatternFilter {
SubsetConfiguration empty = new ConfigBuilder().subset("");
shouldAccept(empty, "anything");
shouldAccept(empty, Arrays.asList(tag("key", "desc", "value")));
+ shouldAccept(empty, mockMetricsRecord("anything", Arrays.asList(
+ tag("key", "desc", "value"))));
}
/**
@@ -50,9 +54,15 @@ public class TestPatternFilter {
shouldAccept(wl, "foo");
shouldAccept(wl, Arrays.asList(tag("bar", "", ""),
tag("foo", "", "f")));
+ shouldAccept(wl, mockMetricsRecord("foo", Arrays.asList(
+ tag("bar", "", ""), tag("foo", "", "f"))));
shouldReject(wl, "bar");
shouldReject(wl, Arrays.asList(tag("bar", "", "")));
shouldReject(wl, Arrays.asList(tag("foo", "", "boo")));
+ shouldReject(wl, mockMetricsRecord("bar", Arrays.asList(
+ tag("foo", "", "f"))));
+ shouldReject(wl, mockMetricsRecord("foo", Arrays.asList(
+ tag("bar", "", ""))));
}
/**
@@ -64,9 +74,15 @@ public class TestPatternFilter {
.add("p.exclude.tags", "foo:f").subset("p");
shouldAccept(bl, "bar");
shouldAccept(bl, Arrays.asList(tag("bar", "", "")));
+ shouldAccept(bl, mockMetricsRecord("bar", Arrays.asList(
+ tag("bar", "", ""))));
shouldReject(bl, "foo");
shouldReject(bl, Arrays.asList(tag("bar", "", ""),
tag("foo", "", "f")));
+ shouldReject(bl, mockMetricsRecord("foo", Arrays.asList(
+ tag("bar", "", ""))));
+ shouldReject(bl, mockMetricsRecord("bar", Arrays.asList(
+ tag("bar", "", ""), tag("foo", "", "f"))));
}
/**
@@ -81,10 +97,18 @@ public class TestPatternFilter {
.add("p.exclude.tags", "bar:b").subset("p");
shouldAccept(c, "foo");
shouldAccept(c, Arrays.asList(tag("foo", "", "f")));
+ shouldAccept(c, mockMetricsRecord("foo", Arrays.asList(
+ tag("foo", "", "f"))));
shouldReject(c, "bar");
shouldReject(c, Arrays.asList(tag("bar", "", "b")));
+ shouldReject(c, mockMetricsRecord("bar", Arrays.asList(
+ tag("foo", "", "f"))));
+ shouldReject(c, mockMetricsRecord("foo", Arrays.asList(
+ tag("bar", "", "b"))));
shouldAccept(c, "foobar");
shouldAccept(c, Arrays.asList(tag("foobar", "", "")));
+ shouldAccept(c, mockMetricsRecord("foobar", Arrays.asList(
+ tag("foobar", "", ""))));
}
/**
@@ -98,6 +122,8 @@ public class TestPatternFilter {
.add("p.exclude.tags", "foo:f").subset("p");
shouldAccept(c, "foo");
shouldAccept(c, Arrays.asList(tag("foo", "", "f")));
+ shouldAccept(c, mockMetricsRecord("foo", Arrays.asList(
+ tag("foo", "", "f"))));
}
static void shouldAccept(SubsetConfiguration conf, String s) {
@@ -110,6 +136,17 @@ public class TestPatternFilter {
assertTrue("accepts "+ tags, newRegexFilter(conf).accepts(tags));
}
+ /**
+ * Asserts that filters with the given configuration accept the given record.
+ *
+ * @param conf SubsetConfiguration containing filter configuration
+ * @param record MetricsRecord to check
+ */
+ static void shouldAccept(SubsetConfiguration conf, MetricsRecord record) {
+ assertTrue("accepts " + record, newGlobFilter(conf).accepts(record));
+ assertTrue("accepts " + record, newRegexFilter(conf).accepts(record));
+ }
+
static void shouldReject(SubsetConfiguration conf, String s) {
assertTrue("rejects "+ s, !newGlobFilter(conf).accepts(s));
assertTrue("rejects "+ s, !newRegexFilter(conf).accepts(s));
@@ -120,6 +157,17 @@ public class TestPatternFilter {
assertTrue("rejects "+ tags, !newRegexFilter(conf).accepts(tags));
}
+ /**
+ * Asserts that filters with the given configuration reject the given record.
+ *
+ * @param conf SubsetConfiguration containing filter configuration
+ * @param record MetricsRecord to check
+ */
+ static void shouldReject(SubsetConfiguration conf, MetricsRecord record) {
+ assertTrue("rejects " + record, !newGlobFilter(conf).accepts(record));
+ assertTrue("rejects " + record, !newRegexFilter(conf).accepts(record));
+ }
+
/**
* Create a new glob filter with a config object
* @param conf the config object
@@ -141,4 +189,19 @@ public class TestPatternFilter {
f.init(conf);
return f;
}
+
+ /**
+ * Creates a mock MetricsRecord with the given name and tags.
+ *
+ * @param name String name
+ * @param tags List tags
+ * @return MetricsRecord newly created mock
+ */
+ private static MetricsRecord mockMetricsRecord(String name,
+ List tags) {
+ MetricsRecord record = mock(MetricsRecord.class);
+ when(record.name()).thenReturn(name);
+ when(record.tags()).thenReturn(tags);
+ return record;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
similarity index 79%
rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
index a3b63c7430c..2b6ce622cf8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
@@ -20,30 +20,31 @@ package org.apache.hadoop.net;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.junit.Test;
public class TestNetworkTopologyWithNodeGroup {
private final static NetworkTopologyWithNodeGroup cluster = new
NetworkTopologyWithNodeGroup();
- private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/s1"),
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/s1"),
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/s2"),
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/s3"),
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/s3"),
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/s4"),
- DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/s5"),
- DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/s6")
+ private final static NodeBase dataNodes[] = new NodeBase[] {
+ new NodeBase("h1", "/d1/r1/s1"),
+ new NodeBase("h2", "/d1/r1/s1"),
+ new NodeBase("h3", "/d1/r1/s2"),
+ new NodeBase("h4", "/d1/r2/s3"),
+ new NodeBase("h5", "/d1/r2/s3"),
+ new NodeBase("h6", "/d1/r2/s4"),
+ new NodeBase("h7", "/d2/r3/s5"),
+ new NodeBase("h8", "/d2/r3/s6")
};
private final static NodeBase computeNode = new NodeBase("/d1/r1/s1/h9");
+
+ private final static NodeBase rackOnlyNode = new NodeBase("h10", "/r2");
static {
for(int i=0; i pickNodesAtRandom(int numNodes,
String excludedScope) {
Map frequency = new HashMap();
- for (DatanodeDescriptor dnd : dataNodes) {
+ for (NodeBase dnd : dataNodes) {
frequency.put(dnd, 0);
}
@@ -161,6 +162,12 @@ public class TestNetworkTopologyWithNodeGroup {
/**
* This test checks that chooseRandom works for an excluded node.
*/
+ /**
+ * Test replica placement policy in case last node is invalid.
+ * We create 6 nodes but the last node is in fault topology (with rack info),
+ * so cannot be added to cluster. We should test proper exception is thrown in
+ * adding node but shouldn't affect the cluster.
+ */
@Test
public void testChooseRandomExcludedNode() {
String scope = "~" + NodeBase.getPath(dataNodes[0]);
@@ -171,5 +178,23 @@ public class TestNetworkTopologyWithNodeGroup {
assertTrue(frequency.get(key) > 0 || key == dataNodes[0]);
}
}
+
+ /**
+ * This test checks that adding a node with invalid topology will be failed
+ * with an exception to show topology is invalid.
+ */
+ @Test
+ public void testAddNodeWithInvalidTopology() {
+ // The last node is a node with invalid topology
+ try {
+ cluster.add(rackOnlyNode);
+ fail("Exception should be thrown, so we should not have reached here.");
+ } catch (Exception e) {
+ if (!(e instanceof IllegalArgumentException)) {
+ fail("Expecting IllegalArgumentException, but caught:" + e);
+ }
+ assertTrue(e.getMessage().contains("illegal network location"));
+ }
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 5b08058d081..1625625241d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.test;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Random;
@@ -330,4 +333,33 @@ public abstract class GenericTestUtils {
" but got:\n" + output,
Pattern.compile(pattern).matcher(output).find());
}
+
+ public static void assertValueNear(long expected, long actual, long allowedError) {
+ assertValueWithinRange(expected - allowedError, expected + allowedError, actual);
+ }
+
+ public static void assertValueWithinRange(long expectedMin, long expectedMax,
+ long actual) {
+ Assert.assertTrue("Expected " + actual + " to be in range (" + expectedMin + ","
+ + expectedMax + ")", expectedMin <= actual && actual <= expectedMax);
+ }
+
+ /**
+ * Assert that there are no threads running whose name matches the
+ * given regular expression.
+ * @param regex the regex to match against
+ */
+ public static void assertNoThreadsMatching(String regex) {
+ Pattern pattern = Pattern.compile(regex);
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+ ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
+ for (ThreadInfo info : infos) {
+ if (info == null) continue;
+ if (pattern.matcher(info.getThreadName()).matches()) {
+ Assert.fail("Leaked thread: " + info + "\n" +
+ Joiner.on("\n").join(info.getStackTrace()));
+ }
+ }
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MockitoUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MockitoUtil.java
index 82abcadf279..32305b5ee78 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MockitoUtil.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MockitoUtil.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.test;
import java.io.Closeable;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.Stubber;
public abstract class MockitoUtil {
@@ -33,4 +36,29 @@ public abstract class MockitoUtil {
return Mockito.mock(clazz,
Mockito.withSettings().extraInterfaces(Closeable.class));
}
+
+ /**
+ * Throw an exception from the mock/spy only in the case that the
+ * call stack at the time the method has a line which matches the given
+ * pattern.
+ *
+ * @param t the Throwable to throw
+ * @param pattern the pattern against which to match the call stack trace
+ * @return the stub in progress
+ */
+ public static Stubber doThrowWhenCallStackMatches(
+ final Throwable t, final String pattern) {
+ return Mockito.doAnswer(new Answer