HBASE-3514 revert
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1076907 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c42b256df4
commit
507abe55ff
|
@ -55,7 +55,6 @@ Release 0.91.0 - Unreleased
|
|||
using a different classloader than system default
|
||||
HBASE-3578 TableInputFormat does not setup the configuration for HBase
|
||||
mapreduce jobs correctly (Dan Harvey via Stack)
|
||||
HBASE-3514 Speedup HFile.Writer append (Matteo via Ryan)
|
||||
HBASE-3593 DemoClient.cpp is outdated
|
||||
HBASE-3601 TestMasterFailover broken in TRUNK
|
||||
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.io.HeapSize;
|
|||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.util.BloomFilter;
|
||||
import org.apache.hadoop.hbase.util.ByteBloomFilter;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||
|
@ -222,6 +221,9 @@ public class HFile {
|
|||
// Used to ensure we write in order.
|
||||
private final RawComparator<byte []> comparator;
|
||||
|
||||
// A stream made per block written.
|
||||
private DataOutputStream out;
|
||||
|
||||
// Number of uncompressed bytes per block. Reinitialized when we start
|
||||
// new block.
|
||||
private int blocksize;
|
||||
|
@ -262,9 +264,9 @@ public class HFile {
|
|||
// Block cache to optionally fill on write
|
||||
private BlockCache blockCache;
|
||||
|
||||
// Byte array output stream made per block written.
|
||||
private ByteBufferOutputStream bbos = null;
|
||||
private DataOutputStream bbosDos = null;
|
||||
// Additional byte array output stream used to fill block cache
|
||||
private ByteArrayOutputStream baos;
|
||||
private DataOutputStream baosDos;
|
||||
private int blockNumber = 0;
|
||||
|
||||
/**
|
||||
|
@ -358,7 +360,7 @@ public class HFile {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void checkBlockBoundary() throws IOException {
|
||||
if (bbosDos != null && bbosDos.size() < blocksize) return;
|
||||
if (this.out != null && this.out.size() < blocksize) return;
|
||||
finishBlock();
|
||||
newBlock();
|
||||
}
|
||||
|
@ -368,18 +370,11 @@ public class HFile {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void finishBlock() throws IOException {
|
||||
if (bbosDos == null) return;
|
||||
|
||||
// Flush Data Output Stream
|
||||
bbosDos.flush();
|
||||
|
||||
// Compress Data and write to output stream
|
||||
DataOutputStream compressStream = getCompressingStream();
|
||||
bbos.writeTo(compressStream);
|
||||
int size = releaseCompressingStream(compressStream);
|
||||
|
||||
if (this.out == null) return;
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
int size = releaseCompressingStream(this.out);
|
||||
this.out = null;
|
||||
blockKeys.add(firstKey);
|
||||
blockOffsets.add(Long.valueOf(blockBegin));
|
||||
blockDataSizes.add(Integer.valueOf(size));
|
||||
|
@ -389,15 +384,14 @@ public class HFile {
|
|||
writeOps++;
|
||||
|
||||
if (blockCache != null) {
|
||||
ByteBuffer blockToCache = bbos.getByteBuffer();
|
||||
baosDos.flush();
|
||||
byte [] bytes = baos.toByteArray();
|
||||
ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length,
|
||||
bytes.length - DATABLOCKMAGIC.length);
|
||||
String blockName = path.toString() + blockNumber;
|
||||
blockCache.cacheBlock(blockName, blockToCache);
|
||||
baosDos.close();
|
||||
}
|
||||
|
||||
bbosDos.close();
|
||||
bbosDos = null;
|
||||
bbos = null;
|
||||
|
||||
blockNumber++;
|
||||
}
|
||||
|
||||
|
@ -408,16 +402,14 @@ public class HFile {
|
|||
private void newBlock() throws IOException {
|
||||
// This is where the next block begins.
|
||||
blockBegin = outputStream.getPos();
|
||||
|
||||
this.out = getCompressingStream();
|
||||
this.out.write(DATABLOCKMAGIC);
|
||||
firstKey = null;
|
||||
|
||||
// to avoid too many calls to realloc(),
|
||||
// pre-allocates the byte stream to the block size + 25%
|
||||
// only if blocksize is under 1Gb
|
||||
int bbosBlocksize = Math.max(blocksize, blocksize + (blocksize / 4));
|
||||
bbos = new ByteBufferOutputStream(bbosBlocksize);
|
||||
bbosDos = new DataOutputStream(bbos);
|
||||
bbosDos.write(DATABLOCKMAGIC);
|
||||
if (blockCache != null) {
|
||||
this.baos = new ByteArrayOutputStream();
|
||||
this.baosDos = new DataOutputStream(baos);
|
||||
this.baosDos.write(DATABLOCKMAGIC);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -475,7 +467,7 @@ public class HFile {
|
|||
for (i = 0; i < metaNames.size(); ++i) {
|
||||
// stop when the current key is greater than our own
|
||||
byte[] cur = metaNames.get(i);
|
||||
if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
|
||||
if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
|
||||
key, 0, key.length) > 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -571,12 +563,12 @@ public class HFile {
|
|||
checkBlockBoundary();
|
||||
}
|
||||
// Write length of key and value and then actual key and value bytes.
|
||||
this.bbosDos.writeInt(klength);
|
||||
this.out.writeInt(klength);
|
||||
this.keylength += klength;
|
||||
this.bbosDos.writeInt(vlength);
|
||||
this.out.writeInt(vlength);
|
||||
this.valuelength += vlength;
|
||||
this.bbosDos.write(key, koffset, klength);
|
||||
this.bbosDos.write(value, voffset, vlength);
|
||||
this.out.write(key, koffset, klength);
|
||||
this.out.write(value, voffset, vlength);
|
||||
// Are we the first key in this block?
|
||||
if (this.firstKey == null) {
|
||||
// Copy the key.
|
||||
|
@ -587,6 +579,13 @@ public class HFile {
|
|||
this.lastKeyOffset = koffset;
|
||||
this.lastKeyLength = klength;
|
||||
this.entryCount ++;
|
||||
// If we are pre-caching blocks on write, fill byte array stream
|
||||
if (blockCache != null) {
|
||||
this.baosDos.writeInt(klength);
|
||||
this.baosDos.writeInt(vlength);
|
||||
this.baosDos.write(key, koffset, klength);
|
||||
this.baosDos.write(value, voffset, vlength);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -18,13 +18,13 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
|
||||
/**
|
||||
* Not thread safe!
|
||||
|
@ -81,18 +81,6 @@ public class ByteBufferOutputStream extends OutputStream {
|
|||
buf.put((byte)b);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the complete contents of this byte buffer output stream to
|
||||
* the specified output stream argument.
|
||||
*
|
||||
* @param out the output stream to which to write the data.
|
||||
* @exception IOException if an I/O error occurs.
|
||||
*/
|
||||
public synchronized void writeTo(OutputStream out) throws IOException {
|
||||
WritableByteChannel channel = Channels.newChannel(out);
|
||||
channel.write(getByteBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
checkSizeAndGrow(b.length);
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.ipc.VersionedProtocol;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
|
Loading…
Reference in New Issue