From 663518032449940f136ae8a9f149ed453837a8d8 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 24 Jul 2014 08:24:05 +0000 Subject: [PATCH] MAPREDUCE-6000. native-task: Simplify ByteBufferDataReader/Writer. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1613036 13f79535-47bb-0310-9956-ffa450edef68 --- .../CHANGES.MAPREDUCE-2841.txt | 1 + .../buffer/ByteBufferDataReader.java | 126 +--------------- .../buffer/ByteBufferDataWriter.java | 141 ++++-------------- .../buffer/TestByteBufferReadWrite.java | 93 +++++------- 4 files changed, 75 insertions(+), 286 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt index e12f7439fc5..aa695cf4570 100644 --- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt +++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt @@ -5,3 +5,4 @@ MAPREDUCE-5985. native-task: Fix build on macosx. Contributed by Binglin Chang MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd) MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd) MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common (todd) +MAPREDUCE-6000. native-task: Simplify ByteBufferDataReader/Writer (todd) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java index 5af71802a80..24f402df643 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.mapred.nativetask.buffer; +import com.google.common.base.Charsets; + import java.io.DataInput; import java.io.EOFException; import java.io.IOException; @@ -31,11 +33,13 @@ import java.nio.ByteBuffer; public class ByteBufferDataReader extends DataInputStream { private ByteBuffer byteBuffer; private char lineCache[]; + private java.io.DataInputStream javaReader; public ByteBufferDataReader(InputBuffer buffer) { if (buffer != null) { - this.byteBuffer = buffer.getByteBuffer(); + reset(buffer); } + javaReader = new java.io.DataInputStream(this); } public void reset(InputBuffer buffer) { @@ -128,128 +132,12 @@ public class ByteBufferDataReader extends DataInputStream { @Override public String readLine() throws IOException { - - InputStream in = this; - - char buf[] = lineCache; - - if (buf == null) { - buf = lineCache = new char[128]; - } - - int room = buf.length; - int offset = 0; - int c; - - loop: while (true) { - switch (c = in.read()) { - case -1: - case '\n': - break loop; - - case '\r': - final int c2 = in.read(); - if ((c2 != '\n') && (c2 != -1)) { - if (!(in instanceof PushbackInputStream)) { - in = new PushbackInputStream(in); - } - ((PushbackInputStream) in).unread(c2); - } - break loop; - - default: - if (--room < 0) { - buf = new char[offset + 128]; - room = buf.length - offset - 1; - System.arraycopy(lineCache, 0, buf, 0, offset); - lineCache = buf; - } - buf[offset++] = (char) c; - break; - } - } - if ((c == -1) && (offset == 0)) { - return null; - } - return String.copyValueOf(buf, 0, offset); + return javaReader.readLine(); } @Override public final String readUTF() throws IOException { - return readUTF(this); - } - - private final static String readUTF(DataInput in) throws IOException { - final int utflen = in.readUnsignedShort(); - byte[] bytearr = null; - char[] chararr = null; - - bytearr = new byte[utflen]; - chararr = new char[utflen]; - - int c, char2, char3; - int count = 0; - int chararr_count = 0; - - in.readFully(bytearr, 0, utflen); - - while (count < utflen) { - c = bytearr[count] & 0xff; - if (c > 127) { - break; - } - count++; - chararr[chararr_count++] = (char) c; - } - - while (count < utflen) { - c = bytearr[count] & 0xff; - switch (c >> 4) { - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - /* 0xxxxxxx */ - count++; - chararr[chararr_count++] = (char) c; - break; - case 12: - case 13: - /* 110x xxxx 10xx xxxx */ - count += 2; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = bytearr[count - 1]; - if ((char2 & 0xC0) != 0x80) { - throw new UTFDataFormatException("malformed input around byte " + count); - } - chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); - break; - case 14: - /* 1110 xxxx 10xx xxxx 10xx xxxx */ - count += 3; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = bytearr[count - 2]; - char3 = bytearr[count - 1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { - throw new UTFDataFormatException("malformed input around byte " + (count - 1)); - } - chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)); - break; - default: - /* 10xx xxxx, 1111 xxxx */ - throw new UTFDataFormatException("malformed input around byte " + count); - } - } - // The number of chars produced may be less than utflen - return new String(chararr, 0, chararr_count); + return javaReader.readUTF(); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java index 36b2fcfcaae..aa87e0d0d75 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java @@ -22,31 +22,42 @@ import java.io.IOException; import java.io.UTFDataFormatException; import java.nio.ByteBuffer; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.primitives.UnsignedInteger; +import com.google.common.primitives.UnsignedInts; import org.apache.hadoop.mapred.nativetask.NativeDataTarget; /** - * write data to a output buffer + * DataOutputStream implementation which buffers data in a fixed-size + * ByteBuffer. + * When the byte buffer has filled up, synchronously passes the buffer + * to a downstream NativeDataTarget. */ public class ByteBufferDataWriter extends DataOutputStream { - private ByteBuffer buffer; + private final ByteBuffer buffer; private final NativeDataTarget target; - private void checkSizeAndFlushNecessary(int length) throws IOException { + private final static byte TRUE = (byte) 1; + private final static byte FALSE = (byte) 0; + private final java.io.DataOutputStream javaWriter; + + private void checkSizeAndFlushIfNecessary(int length) throws IOException { if (buffer.position() > 0 && buffer.remaining() < length) { flush(); } } public ByteBufferDataWriter(NativeDataTarget handler) { - if (null != handler) { - this.buffer = handler.getOutputBuffer().getByteBuffer(); - } + Preconditions.checkNotNull(handler); + this.buffer = handler.getOutputBuffer().getByteBuffer(); this.target = handler; + this.javaWriter = new java.io.DataOutputStream(this); } @Override public synchronized void write(int v) throws IOException { - checkSizeAndFlushNecessary(1); + checkSizeAndFlushIfNecessary(1); buffer.put((byte) v); } @@ -89,164 +100,72 @@ public class ByteBufferDataWriter extends DataOutputStream { target.finishSendData(); } - private final static byte TRUE = (byte) 1; - private final static byte FALSE = (byte) 0; - @Override public final void writeBoolean(boolean v) throws IOException { - checkSizeAndFlushNecessary(1); + checkSizeAndFlushIfNecessary(1); buffer.put(v ? TRUE : FALSE); } @Override public final void writeByte(int v) throws IOException { - checkSizeAndFlushNecessary(1); + checkSizeAndFlushIfNecessary(1); buffer.put((byte) v); } @Override public final void writeShort(int v) throws IOException { - checkSizeAndFlushNecessary(2); + checkSizeAndFlushIfNecessary(2); buffer.putShort((short) v); } @Override public final void writeChar(int v) throws IOException { - checkSizeAndFlushNecessary(2); + checkSizeAndFlushIfNecessary(2); buffer.put((byte) ((v >>> 8) & 0xFF)); buffer.put((byte) ((v >>> 0) & 0xFF)); } @Override public final void writeInt(int v) throws IOException { - checkSizeAndFlushNecessary(4); + checkSizeAndFlushIfNecessary(4); buffer.putInt(v); } @Override public final void writeLong(long v) throws IOException { - checkSizeAndFlushNecessary(8); + checkSizeAndFlushIfNecessary(8); buffer.putLong(v); } @Override public final void writeFloat(float v) throws IOException { - checkSizeAndFlushNecessary(4); + checkSizeAndFlushIfNecessary(4); writeInt(Float.floatToIntBits(v)); } @Override public final void writeDouble(double v) throws IOException { - checkSizeAndFlushNecessary(8); + checkSizeAndFlushIfNecessary(8); writeLong(Double.doubleToLongBits(v)); } @Override public final void writeBytes(String s) throws IOException { - final int len = s.length(); - - int remain = len; - int offset = 0; - while (remain > 0) { - int currentFlush = 0; - if (buffer.remaining() > 0) { - currentFlush = Math.min(buffer.remaining(), remain); - - for (int i = 0; i < currentFlush; i++) { - buffer.put((byte) s.charAt(offset + i)); - } - - remain -= currentFlush; - offset += currentFlush; - } else { - flush(); - } - } + javaWriter.writeBytes(s); } @Override public final void writeChars(String s) throws IOException { - final int len = s.length(); - - int remain = len; - int offset = 0; - - while (remain > 0) { - int currentFlush = 0; - if (buffer.remaining() > 2) { - currentFlush = Math.min(buffer.remaining() / 2, remain); - - for (int i = 0; i < currentFlush; i++) { - buffer.putChar(s.charAt(offset + i)); - } - - remain -= currentFlush; - offset += currentFlush; - } else { - flush(); - } - } + javaWriter.writeChars(s); } @Override public final void writeUTF(String str) throws IOException { - writeUTF(str, this); - } - - private int writeUTF(String str, DataOutput out) throws IOException { - final int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes"); - } - - final byte[] bytearr = new byte[utflen + 2]; - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } - } - write(bytearr, 0, utflen + 2); - return utflen + 2; + javaWriter.writeUTF(str); } @Override public boolean hasUnFlushedData() { - return !(buffer.position() == 0); + return buffer.position() > 0; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java index 424354bd99e..a405634b140 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java @@ -17,20 +17,20 @@ */ package org.apache.hadoop.mapred.nativetask.buffer; -import java.io.IOException; -import java.io.UnsupportedEncodingException; +import java.io.*; +import com.google.common.base.Charsets; +import com.google.common.primitives.Shorts; import org.apache.hadoop.mapred.nativetask.NativeDataTarget; import junit.framework.Assert; import junit.framework.TestCase; +import org.mockito.Mockito; -public class TestByteBufferReadWrite extends TestCase{ - - +public class TestByteBufferReadWrite extends TestCase { public void testReadWrite() throws IOException { byte[] buff = new byte[10000]; - + InputBuffer input = new InputBuffer(buff); MockDataTarget target = new MockDataTarget(buff); ByteBufferDataWriter writer = new ByteBufferDataWriter(target); @@ -48,7 +48,7 @@ public class TestByteBufferReadWrite extends TestCase{ writer.writeBytes("goodboy"); writer.writeChars("hello"); writer.writeUTF("native task"); - + int length = target.getOutputBuffer().length(); input.rewind(0, length); ByteBufferDataReader reader = new ByteBufferDataReader(input); @@ -84,7 +84,29 @@ public class TestByteBufferReadWrite extends TestCase{ Assert.assertEquals(0, input.remaining()); } - + + /** + * Test that Unicode characters outside the basic multilingual plane, + * such as this cat face, are properly encoded. + */ + public void testCatFace() throws IOException { + byte[] buff = new byte[10]; + MockDataTarget target = new MockDataTarget(buff); + ByteBufferDataWriter writer = new ByteBufferDataWriter(target); + String catFace = "\uD83D\uDE38"; + writer.writeUTF(catFace); + + // Check that our own decoder can read it + InputBuffer input = new InputBuffer(buff); + input.rewind(0, buff.length); + ByteBufferDataReader reader = new ByteBufferDataReader(input); + assertEquals(catFace, reader.readUTF()); + + // Check that the standard Java one can read it too + String fromJava = new java.io.DataInputStream(new ByteArrayInputStream(buff)).readUTF(); + assertEquals(catFace, fromJava); + } + public void testShortOfSpace() throws IOException { byte[] buff = new byte[10]; MockDataTarget target = new MockDataTarget(buff); @@ -100,20 +122,8 @@ public class TestByteBufferReadWrite extends TestCase{ public void testFlush() throws IOException { byte[] buff = new byte[10]; - final Counter flushCount = new Counter(); - final Flag finishFlag = new Flag(); - MockDataTarget target = new MockDataTarget(buff) { - @Override - public void sendData() throws IOException { - flushCount.increase(); - } - - @Override - public void finishSendData() throws IOException { - finishFlag.set(true); - } - }; - + MockDataTarget target = Mockito.spy(new MockDataTarget(buff)); + ByteBufferDataWriter writer = new ByteBufferDataWriter(target); Assert.assertEquals(false, writer.hasUnFlushedData()); @@ -121,10 +131,9 @@ public class TestByteBufferReadWrite extends TestCase{ writer.write(new byte[100]); Assert.assertEquals(true, writer.hasUnFlushedData()); - writer.close(); - Assert.assertEquals(11, flushCount.get()); - Assert.assertEquals(true, finishFlag.get()); - + writer.close(); + Mockito.verify(target, Mockito.times(11)).sendData(); + Mockito.verify(target).finishSendData(); } private static String toString(byte[] str) throws UnsupportedEncodingException { @@ -140,42 +149,14 @@ public class TestByteBufferReadWrite extends TestCase{ } @Override - public void sendData() throws IOException { - - } + public void sendData() throws IOException {} @Override - public void finishSendData() throws IOException { - - } + public void finishSendData() throws IOException {} @Override public OutputBuffer getOutputBuffer() { return out; } } - - private static class Counter { - private int count; - - public int get() { - return count; - } - - public void increase() { - count++; - } - } - - private static class Flag { - private boolean value; - - public void set(boolean status) { - this.value = status; - } - - public boolean get() { - return this.value; - } - } }