HBASE-3514 Speedup HFile.Writer append (Matteo via Ryan)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1076138 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2011-03-02 08:32:09 +00:00
parent 1b99a716fe
commit e07675d2dc
4 changed files with 53 additions and 38 deletions

View File

@ -56,6 +56,7 @@ Release 0.91.0 - Unreleased
using a different classloader than system default using a different classloader than system default
HBASE-3578 TableInputFormat does not setup the configuration for HBase HBASE-3578 TableInputFormat does not setup the configuration for HBase
mapreduce jobs correctly (Dan Harvey via Stack) mapreduce jobs correctly (Dan Harvey via Stack)
HBASE-3514 Speedup HFile.Writer append (Matteo via Ryan)
IMPROVEMENTS IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.ByteBloomFilter; import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.CompressionTest;
@ -221,9 +222,6 @@ public class HFile {
// Used to ensure we write in order. // Used to ensure we write in order.
private final RawComparator<byte []> comparator; private final RawComparator<byte []> comparator;
// A stream made per block written.
private DataOutputStream out;
// Number of uncompressed bytes per block. Reinitialized when we start // Number of uncompressed bytes per block. Reinitialized when we start
// new block. // new block.
private int blocksize; private int blocksize;
@ -264,9 +262,9 @@ public class HFile {
// Block cache to optionally fill on write // Block cache to optionally fill on write
private BlockCache blockCache; private BlockCache blockCache;
// Additional byte array output stream used to fill block cache // Byte array output stream made per block written.
private ByteArrayOutputStream baos; private ByteBufferOutputStream bbos = null;
private DataOutputStream baosDos; private DataOutputStream bbosDos = null;
private int blockNumber = 0; private int blockNumber = 0;
/** /**
@ -360,7 +358,7 @@ public class HFile {
* @throws IOException * @throws IOException
*/ */
private void checkBlockBoundary() throws IOException { private void checkBlockBoundary() throws IOException {
if (this.out != null && this.out.size() < blocksize) return; if (bbosDos != null && bbosDos.size() < blocksize) return;
finishBlock(); finishBlock();
newBlock(); newBlock();
} }
@ -370,11 +368,18 @@ public class HFile {
* @throws IOException * @throws IOException
*/ */
private void finishBlock() throws IOException { private void finishBlock() throws IOException {
if (this.out == null) return; if (bbosDos == null) return;
// Flush Data Output Stream
bbosDos.flush();
// Compress Data and write to output stream
DataOutputStream compressStream = getCompressingStream();
bbos.writeTo(compressStream);
int size = releaseCompressingStream(compressStream);
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
int size = releaseCompressingStream(this.out);
this.out = null;
blockKeys.add(firstKey); blockKeys.add(firstKey);
blockOffsets.add(Long.valueOf(blockBegin)); blockOffsets.add(Long.valueOf(blockBegin));
blockDataSizes.add(Integer.valueOf(size)); blockDataSizes.add(Integer.valueOf(size));
@ -384,14 +389,15 @@ public class HFile {
writeOps++; writeOps++;
if (blockCache != null) { if (blockCache != null) {
baosDos.flush(); ByteBuffer blockToCache = bbos.getByteBuffer();
byte [] bytes = baos.toByteArray();
ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length,
bytes.length - DATABLOCKMAGIC.length);
String blockName = path.toString() + blockNumber; String blockName = path.toString() + blockNumber;
blockCache.cacheBlock(blockName, blockToCache); blockCache.cacheBlock(blockName, blockToCache);
baosDos.close();
} }
bbosDos.close();
bbosDos = null;
bbos = null;
blockNumber++; blockNumber++;
} }
@ -402,14 +408,16 @@ public class HFile {
private void newBlock() throws IOException { private void newBlock() throws IOException {
// This is where the next block begins. // This is where the next block begins.
blockBegin = outputStream.getPos(); blockBegin = outputStream.getPos();
this.out = getCompressingStream();
this.out.write(DATABLOCKMAGIC);
firstKey = null; firstKey = null;
if (blockCache != null) {
this.baos = new ByteArrayOutputStream(); // to avoid too many calls to realloc(),
this.baosDos = new DataOutputStream(baos); // pre-allocates the byte stream to the block size + 25%
this.baosDos.write(DATABLOCKMAGIC); // only if blocksize is under 1Gb
} int bbosBlocksize = Math.max(blocksize, blocksize + (blocksize / 4));
bbos = new ByteBufferOutputStream(bbosBlocksize);
bbosDos = new DataOutputStream(bbos);
bbosDos.write(DATABLOCKMAGIC);
} }
/* /*
@ -467,7 +475,7 @@ public class HFile {
for (i = 0; i < metaNames.size(); ++i) { for (i = 0; i < metaNames.size(); ++i) {
// stop when the current key is greater than our own // stop when the current key is greater than our own
byte[] cur = metaNames.get(i); byte[] cur = metaNames.get(i);
if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
key, 0, key.length) > 0) { key, 0, key.length) > 0) {
break; break;
} }
@ -563,12 +571,12 @@ public class HFile {
checkBlockBoundary(); checkBlockBoundary();
} }
// Write length of key and value and then actual key and value bytes. // Write length of key and value and then actual key and value bytes.
this.out.writeInt(klength); this.bbosDos.writeInt(klength);
this.keylength += klength; this.keylength += klength;
this.out.writeInt(vlength); this.bbosDos.writeInt(vlength);
this.valuelength += vlength; this.valuelength += vlength;
this.out.write(key, koffset, klength); this.bbosDos.write(key, koffset, klength);
this.out.write(value, voffset, vlength); this.bbosDos.write(value, voffset, vlength);
// Are we the first key in this block? // Are we the first key in this block?
if (this.firstKey == null) { if (this.firstKey == null) {
// Copy the key. // Copy the key.
@ -579,13 +587,6 @@ public class HFile {
this.lastKeyOffset = koffset; this.lastKeyOffset = koffset;
this.lastKeyLength = klength; this.lastKeyLength = klength;
this.entryCount ++; this.entryCount ++;
// If we are pre-caching blocks on write, fill byte array stream
if (blockCache != null) {
this.baosDos.writeInt(klength);
this.baosDos.writeInt(vlength);
this.baosDos.write(key, koffset, klength);
this.baosDos.write(value, voffset, vlength);
}
} }
/* /*

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2010 The Apache Software Foundation * Copyright 2011 The Apache Software Foundation
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -18,13 +18,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
/** /**
* Not thread safe! * Not thread safe!
@ -81,6 +81,18 @@ public class ByteBufferOutputStream extends OutputStream {
buf.put((byte)b); buf.put((byte)b);
} }
/**
* Writes the complete contents of this byte buffer output stream to
* the specified output stream argument.
*
* @param out the output stream to which to write the data.
* @exception IOException if an I/O error occurs.
*/
public synchronized void writeTo(OutputStream out) throws IOException {
WritableByteChannel channel = Channels.newChannel(out);
channel.write(getByteBuffer());
}
@Override @Override
public void write(byte[] b) throws IOException { public void write(byte[] b) throws IOException {
checkSizeAndGrow(b.length); checkSizeAndGrow(b.length);