diff --git a/CHANGES.txt b/CHANGES.txt index c3ebc0bd130..5c0e2fd7e38 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -55,7 +55,6 @@ Release 0.91.0 - Unreleased using a different classloader than system default HBASE-3578 TableInputFormat does not setup the configuration for HBase mapreduce jobs correctly (Dan Harvey via Stack) - HBASE-3514 Speedup HFile.Writer append (Matteo via Ryan) HBASE-3593 DemoClient.cpp is outdated HBASE-3601 TestMasterFailover broken in TRUNK diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 3a6dcf22f1d..06f391ce00b 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.ByteBloomFilter; -import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompressionTest; @@ -222,6 +221,9 @@ public class HFile { // Used to ensure we write in order. private final RawComparator comparator; + // A stream made per block written. + private DataOutputStream out; + // Number of uncompressed bytes per block. Reinitialized when we start // new block. private int blocksize; @@ -262,9 +264,9 @@ public class HFile { // Block cache to optionally fill on write private BlockCache blockCache; - // Byte array output stream made per block written. - private ByteBufferOutputStream bbos = null; - private DataOutputStream bbosDos = null; + // Additional byte array output stream used to fill block cache + private ByteArrayOutputStream baos; + private DataOutputStream baosDos; private int blockNumber = 0; /** @@ -358,7 +360,7 @@ public class HFile { * @throws IOException */ private void checkBlockBoundary() throws IOException { - if (bbosDos != null && bbosDos.size() < blocksize) return; + if (this.out != null && this.out.size() < blocksize) return; finishBlock(); newBlock(); } @@ -368,18 +370,11 @@ public class HFile { * @throws IOException */ private void finishBlock() throws IOException { - if (bbosDos == null) return; - - // Flush Data Output Stream - bbosDos.flush(); - - // Compress Data and write to output stream - DataOutputStream compressStream = getCompressingStream(); - bbos.writeTo(compressStream); - int size = releaseCompressingStream(compressStream); - + if (this.out == null) return; long now = System.currentTimeMillis(); + int size = releaseCompressingStream(this.out); + this.out = null; blockKeys.add(firstKey); blockOffsets.add(Long.valueOf(blockBegin)); blockDataSizes.add(Integer.valueOf(size)); @@ -389,15 +384,14 @@ public class HFile { writeOps++; if (blockCache != null) { - ByteBuffer blockToCache = bbos.getByteBuffer(); + baosDos.flush(); + byte [] bytes = baos.toByteArray(); + ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length, + bytes.length - DATABLOCKMAGIC.length); String blockName = path.toString() + blockNumber; blockCache.cacheBlock(blockName, blockToCache); + baosDos.close(); } - - bbosDos.close(); - bbosDos = null; - bbos = null; - blockNumber++; } @@ -408,16 +402,14 @@ public class HFile { private void newBlock() throws IOException { // This is where the next block begins. blockBegin = outputStream.getPos(); - + this.out = getCompressingStream(); + this.out.write(DATABLOCKMAGIC); firstKey = null; - - // to avoid too many calls to realloc(), - // pre-allocates the byte stream to the block size + 25% - // only if blocksize is under 1Gb - int bbosBlocksize = Math.max(blocksize, blocksize + (blocksize / 4)); - bbos = new ByteBufferOutputStream(bbosBlocksize); - bbosDos = new DataOutputStream(bbos); - bbosDos.write(DATABLOCKMAGIC); + if (blockCache != null) { + this.baos = new ByteArrayOutputStream(); + this.baosDos = new DataOutputStream(baos); + this.baosDos.write(DATABLOCKMAGIC); + } } /* @@ -475,7 +467,7 @@ public class HFile { for (i = 0; i < metaNames.size(); ++i) { // stop when the current key is greater than our own byte[] cur = metaNames.get(i); - if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, + if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { break; } @@ -571,12 +563,12 @@ public class HFile { checkBlockBoundary(); } // Write length of key and value and then actual key and value bytes. - this.bbosDos.writeInt(klength); + this.out.writeInt(klength); this.keylength += klength; - this.bbosDos.writeInt(vlength); + this.out.writeInt(vlength); this.valuelength += vlength; - this.bbosDos.write(key, koffset, klength); - this.bbosDos.write(value, voffset, vlength); + this.out.write(key, koffset, klength); + this.out.write(value, voffset, vlength); // Are we the first key in this block? if (this.firstKey == null) { // Copy the key. @@ -587,6 +579,13 @@ public class HFile { this.lastKeyOffset = koffset; this.lastKeyLength = klength; this.entryCount ++; + // If we are pre-caching blocks on write, fill byte array stream + if (blockCache != null) { + this.baosDos.writeInt(klength); + this.baosDos.writeInt(vlength); + this.baosDos.write(key, koffset, klength); + this.baosDos.write(value, voffset, vlength); + } } /* diff --git a/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java similarity index 81% rename from src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java rename to src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java index d1e7b8c901e..4d8ecbd407a 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright 2011 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,13 +18,13 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.util; +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; /** * Not thread safe! @@ -81,18 +81,6 @@ public class ByteBufferOutputStream extends OutputStream { buf.put((byte)b); } - /** - * Writes the complete contents of this byte buffer output stream to - * the specified output stream argument. - * - * @param out the output stream to which to write the data. - * @exception IOException if an I/O error occurs. - */ - public synchronized void writeTo(OutputStream out) throws IOException { - WritableByteChannel channel = Channels.newChannel(out); - channel.write(getByteBuffer()); - } - @Override public void write(byte[] b) throws IOException { checkSizeAndGrow(b.length); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index ec28de42701..f36fe6249a9 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -60,7 +60,6 @@ import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import com.google.common.base.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder;