HBASE-900 Regionserver memory leak causing OOME during relatively modest bulk importing; part 2
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@726153 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0f3838e1d1
commit
3f80f67db1
|
@ -102,7 +102,7 @@ Release 0.19.0 - Unreleased
|
|||
HBASE-1000 Sleeper.sleep does not go back to sleep when interrupted
|
||||
and no stop flag given.
|
||||
HBASE-900 Regionserver memory leak causing OOME during relatively
|
||||
modest bulk importing; part 1
|
||||
modest bulk importing; part 1 and part 2
|
||||
HBASE-1054 Index NPE on scanning (Clint Morgan via Andrew Purtell)
|
||||
HBASE-1052 Stopping a HRegionServer with unflushed cache causes data loss
|
||||
from org.apache.hadoop.hbase.DroppedSnapshotException
|
||||
|
|
|
@ -53,9 +53,12 @@
|
|||
</property>
|
||||
<property>
|
||||
<name>hbase.client.write.buffer</name>
|
||||
<value>10485760</value>
|
||||
<value>2097152</value>
|
||||
<description>Size of the write buffer in bytes. A bigger buffer takes more
|
||||
memory but reduces the number of RPC.
|
||||
memory -- on both the client and server side since server instantiates
|
||||
the passed write buffer to process it -- but reduces the number of RPC.
|
||||
For an estimate of server-side memory-used, evaluate
|
||||
hbase.client.write.buffer * hbase.regionserver.handler.count
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.DataInput;
|
|||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.io.WritableComparator;
|
||||
|
@ -31,7 +32,7 @@ import org.apache.hadoop.io.WritableComparator;
|
|||
/**
|
||||
* A Key for a stored row.
|
||||
*/
|
||||
public class HStoreKey implements WritableComparable<HStoreKey> {
|
||||
public class HStoreKey implements WritableComparable<HStoreKey>, HeapSize {
|
||||
/**
|
||||
* Colon character in UTF-8
|
||||
*/
|
||||
|
@ -47,6 +48,14 @@ public class HStoreKey implements WritableComparable<HStoreKey> {
|
|||
*/
|
||||
private HRegionInfo regionInfo = null;
|
||||
|
||||
/**
|
||||
* Estimated size tax paid for each instance of HSK. Estimate based on
|
||||
* study of jhat and jprofiler numbers.
|
||||
*/
|
||||
// In jprofiler, says shallow size is 48 bytes. Add to it cost of two
|
||||
// byte arrays and then something for the HRI hosting.
|
||||
public static final int ESTIMATED_HEAP_TAX = 48;
|
||||
|
||||
/** Default constructor used in conjunction with Writable interface */
|
||||
public HStoreKey() {
|
||||
super();
|
||||
|
@ -201,11 +210,6 @@ public class HStoreKey implements WritableComparable<HStoreKey> {
|
|||
this.regionInfo = regionInfo;
|
||||
}
|
||||
|
||||
/** @return Approximate size in bytes of this key. */
|
||||
public long getSize() {
|
||||
return getRow().length + getColumn().length + Bytes.SIZEOF_LONG;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new HStoreKey from another
|
||||
*
|
||||
|
@ -587,6 +591,12 @@ public class HStoreKey implements WritableComparable<HStoreKey> {
|
|||
this.timestamp = in.readLong();
|
||||
}
|
||||
|
||||
public long heapSize() {
|
||||
return getRow().length + Bytes.ESTIMATED_HEAP_TAX +
|
||||
getColumn().length + Bytes.ESTIMATED_HEAP_TAX +
|
||||
ESTIMATED_HEAP_TAX;
|
||||
}
|
||||
|
||||
/**
|
||||
* Passed as comparator for memcache and for store files. See HBASE-868.
|
||||
*/
|
||||
|
@ -649,8 +659,8 @@ public class HStoreKey implements WritableComparable<HStoreKey> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return this.beforeThisKey.getSize();
|
||||
public long heapSize() {
|
||||
return this.beforeThisKey.heapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -854,9 +854,6 @@ public class HConnectionManager implements HConstants {
|
|||
throw new RetriesExhaustedException(callable.getServerName(),
|
||||
callable.getRegionName(), callable.getRow(), tries, exceptions);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + t.getMessage());
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(getPauseTime(tries));
|
||||
|
|
|
@ -113,7 +113,7 @@ public class HTable {
|
|||
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
||||
this.writeBuffer = new ArrayList<BatchUpdate>();
|
||||
this.writeBufferSize =
|
||||
this.configuration.getLong("hbase.client.write.buffer", 10485760);
|
||||
this.configuration.getLong("hbase.client.write.buffer", 2097152);
|
||||
this.autoFlush = true;
|
||||
this.currentWriteBufferSize = 0;
|
||||
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 30);
|
||||
|
@ -1233,8 +1233,8 @@ public class HTable {
|
|||
batchUpdate.setRowLock(rl.getLockId());
|
||||
}
|
||||
writeBuffer.add(batchUpdate);
|
||||
currentWriteBufferSize += batchUpdate.getSize();
|
||||
if(autoFlush || currentWriteBufferSize > writeBufferSize) {
|
||||
currentWriteBufferSize += batchUpdate.heapSize();
|
||||
if (autoFlush || currentWriteBufferSize > writeBufferSize) {
|
||||
flushCommits();
|
||||
}
|
||||
}
|
||||
|
@ -1247,12 +1247,12 @@ public class HTable {
|
|||
*/
|
||||
public synchronized void commit(final List<BatchUpdate> batchUpdates)
|
||||
throws IOException {
|
||||
for(BatchUpdate bu : batchUpdates) {
|
||||
for (BatchUpdate bu : batchUpdates) {
|
||||
checkRowAndColumns(bu);
|
||||
writeBuffer.add(bu);
|
||||
currentWriteBufferSize += bu.getSize();
|
||||
currentWriteBufferSize += bu.heapSize();
|
||||
}
|
||||
if(autoFlush || currentWriteBufferSize > writeBufferSize) {
|
||||
if (autoFlush || currentWriteBufferSize > writeBufferSize) {
|
||||
flushCommits();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,11 +35,17 @@ import org.apache.hadoop.io.Writable;
|
|||
* a class per type because it makes the serialization easier.
|
||||
* @see BatchUpdate
|
||||
*/
|
||||
public class BatchOperation implements Writable {
|
||||
public class BatchOperation implements Writable, HeapSize {
|
||||
/**
|
||||
* Estimated size of this object.
|
||||
*/
|
||||
// JHat says this is 32 bytes.
|
||||
public final int ESTIMATED_HEAP_TAX = 36;
|
||||
|
||||
private byte [] column = null;
|
||||
|
||||
// A null value defines DELETE operations.
|
||||
private byte[] value = null;
|
||||
private byte [] value = null;
|
||||
|
||||
/**
|
||||
* Default constructor
|
||||
|
@ -132,4 +138,9 @@ public class BatchOperation implements Writable {
|
|||
out.write(value);
|
||||
}
|
||||
}
|
||||
|
||||
public long heapSize() {
|
||||
return Bytes.ESTIMATED_HEAP_TAX * 2 + this.column.length +
|
||||
this.value.length + ESTIMATED_HEAP_TAX;
|
||||
}
|
||||
}
|
|
@ -22,12 +22,15 @@ package org.apache.hadoop.hbase.io;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.RuntimeMXBean;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RowLock;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
||||
|
@ -38,8 +41,15 @@ import org.apache.hadoop.io.WritableComparable;
|
|||
* can result in multiple BatchUpdate objects if the batch contains rows that
|
||||
* are served by multiple region servers.
|
||||
*/
|
||||
public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
||||
Iterable<BatchOperation> {
|
||||
public class BatchUpdate
|
||||
implements WritableComparable<BatchUpdate>, Iterable<BatchOperation>, HeapSize {
|
||||
private static final Log LOG = LogFactory.getLog(BatchUpdate.class);
|
||||
|
||||
/**
|
||||
* Estimated 'shallow size' of this object not counting payload.
|
||||
*/
|
||||
// Shallow size is 56. Add 32 for the arraylist below.
|
||||
public static final int ESTIMATED_HEAP_TAX = 56 + 32;
|
||||
|
||||
// the row being updated
|
||||
private byte [] row = null;
|
||||
|
@ -141,13 +151,6 @@ public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
|||
return row;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return BatchUpdate size in bytes.
|
||||
*/
|
||||
public long getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the timestamp this BatchUpdate will be committed with.
|
||||
*/
|
||||
|
@ -247,8 +250,9 @@ public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
|||
// If null, the PUT becomes a DELETE operation.
|
||||
throw new IllegalArgumentException("Passed value cannot be null");
|
||||
}
|
||||
size += val.length + column.length;
|
||||
operations.add(new BatchOperation(column, val));
|
||||
BatchOperation bo = new BatchOperation(column, val);
|
||||
this.size += bo.heapSize();
|
||||
operations.add(bo);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -336,4 +340,48 @@ public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
|||
public int compareTo(BatchUpdate o) {
|
||||
return Bytes.compareTo(this.row, o.getRow());
|
||||
}
|
||||
|
||||
public long heapSize() {
|
||||
return this.row.length + Bytes.ESTIMATED_HEAP_TAX + this.size +
|
||||
ESTIMATED_HEAP_TAX;
|
||||
}
|
||||
|
||||
/**
|
||||
* Code to test sizes of BatchUpdate arrays.
|
||||
* @param args
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
|
||||
LOG.info("vmName=" + runtime.getVmName() + ", vmVendor="
|
||||
+ runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
|
||||
LOG.info("vmInputArguments=" + runtime.getInputArguments());
|
||||
final int count = 10000;
|
||||
BatchUpdate[] batch1 = new BatchUpdate[count];
|
||||
// TODO: x32 vs x64
|
||||
long size = 0;
|
||||
for (int i = 0; i < count; i++) {
|
||||
BatchUpdate bu = new BatchUpdate(HConstants.EMPTY_BYTE_ARRAY);
|
||||
bu.put(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
|
||||
batch1[i] = bu;
|
||||
size += bu.heapSize();
|
||||
}
|
||||
LOG.info("batch1 estimated size=" + size);
|
||||
// Make a variably sized memcache.
|
||||
size = 0;
|
||||
BatchUpdate[] batch2 = new BatchUpdate[count];
|
||||
for (int i = 0; i < count; i++) {
|
||||
BatchUpdate bu = new BatchUpdate(Bytes.toBytes(i));
|
||||
bu.put(Bytes.toBytes(i), new byte[i]);
|
||||
batch2[i] = bu;
|
||||
size += bu.heapSize();
|
||||
}
|
||||
LOG.info("batch2 estimated size=" + size);
|
||||
final int seconds = 30;
|
||||
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
|
||||
for (int i = 0; i < seconds; i++) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
LOG.info("Exiting.");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
/**
|
||||
* Implementations can be asked for an estimate of their size in bytes.
|
||||
* Useful for sizing caches. Its a given that implementation approximations
|
||||
* probably do not account for 32 vs 64 bit nor for different VM implemenations.
|
||||
*/
|
||||
public interface HeapSize {
|
||||
/**
|
||||
* @return Approximate 'exclusive deep size' of implementing object. Includes
|
||||
* count of payload and hosting object sizings.
|
||||
*/
|
||||
public long heapSize();
|
||||
}
|
|
@ -634,8 +634,9 @@ public class HStore implements HConstants {
|
|||
return compactionNeeded;
|
||||
}
|
||||
|
||||
private boolean internalFlushCache(SortedMap<HStoreKey, byte []> cache,
|
||||
long logCacheFlushId) throws IOException {
|
||||
private boolean internalFlushCache(final SortedMap<HStoreKey, byte []> cache,
|
||||
final long logCacheFlushId)
|
||||
throws IOException {
|
||||
long flushed = 0;
|
||||
// Don't flush if there are no entries.
|
||||
if (cache.size() == 0) {
|
||||
|
@ -674,7 +675,7 @@ public class HStore implements HConstants {
|
|||
if (!isExpired(curkey, ttl, now)) {
|
||||
entries++;
|
||||
out.append(curkey, new ImmutableBytesWritable(bytes));
|
||||
flushed += curkey.getSize() + (bytes == null ? 0 : bytes.length);
|
||||
flushed += this.memcache.heapSize(curkey, bytes, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -693,7 +694,7 @@ public class HStore implements HConstants {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
|
||||
" with " + entries +
|
||||
" entries, sequence id " + logCacheFlushId + ", data size " +
|
||||
" entries, sequence id " + logCacheFlushId + ", data size ~" +
|
||||
StringUtils.humanReadableInt(flushed) + ", file size " +
|
||||
StringUtils.humanReadableInt(newStoreSize) + " to " +
|
||||
this.info.getRegionNameAsString());
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.RuntimeMXBean;
|
||||
import java.rmi.UnexpectedException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -52,7 +54,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
* this point we let the snapshot go.
|
||||
*/
|
||||
class Memcache {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
private static final Log LOG = LogFactory.getLog(Memcache.class);
|
||||
|
||||
private final long ttl;
|
||||
|
||||
|
@ -170,18 +172,48 @@ class Memcache {
|
|||
* Write an update
|
||||
* @param key
|
||||
* @param value
|
||||
* @return memcache size delta
|
||||
* @return memcache Approximate size of the passed key and value. Includes
|
||||
* cost of hosting HSK and byte arrays as well as the Map.Entry this addition
|
||||
* costs when we insert into the backing TreeMap.
|
||||
*/
|
||||
long add(final HStoreKey key, final byte[] value) {
|
||||
long size = -1;
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
byte[] oldValue = this.memcache.remove(key);
|
||||
byte [] oldValue = this.memcache.remove(key);
|
||||
this.memcache.put(key, value);
|
||||
return key.getSize() + (value == null ? 0 : value.length) -
|
||||
(oldValue == null ? 0 : oldValue.length);
|
||||
size = heapSize(key, value, oldValue);
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
/*
|
||||
* Calcuate how the memcache size has changed, approximately.
|
||||
* Add in tax of TreeMap.Entry.
|
||||
* @param key
|
||||
* @param value
|
||||
* @param oldValue
|
||||
* @return
|
||||
*/
|
||||
long heapSize(final HStoreKey key, final byte [] value,
|
||||
final byte [] oldValue) {
|
||||
// First add value length.
|
||||
long keySize = key.heapSize();
|
||||
// Add value.
|
||||
long size = value == null? 0: value.length;
|
||||
if (oldValue == null) {
|
||||
size += keySize;
|
||||
// Add overhead for value byte array and for Map.Entry -- 57 bytes
|
||||
// on x64 according to jprofiler.
|
||||
size += Bytes.ESTIMATED_HEAP_TAX + 57;
|
||||
} else {
|
||||
// If old value, don't add overhead again nor key size. Just add
|
||||
// difference in value sizes.
|
||||
size -= oldValue.length;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -835,4 +867,47 @@ class Memcache {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Code to help figure if our approximation of object heap sizes is close
|
||||
* enough. See hbase-900. Fills memcaches then waits so user can heap
|
||||
* dump and bring up resultant hprof in something like jprofiler which
|
||||
* allows you get 'deep size' on objects.
|
||||
* @param args
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public static void main(String [] args) throws InterruptedException {
|
||||
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
|
||||
LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
|
||||
runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
|
||||
LOG.info("vmInputArguments=" + runtime.getInputArguments());
|
||||
Memcache memcache1 = new Memcache();
|
||||
// TODO: x32 vs x64
|
||||
long size = 0;
|
||||
final int count = 10000;
|
||||
for (int i = 0; i < count; i++) {
|
||||
size += memcache1.add(new HStoreKey(Bytes.toBytes(i)),
|
||||
HConstants.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
LOG.info("memcache1 estimated size=" + size);
|
||||
for (int i = 0; i < count; i++) {
|
||||
size += memcache1.add(new HStoreKey(Bytes.toBytes(i)),
|
||||
HConstants.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
LOG.info("memcache1 estimated size (2nd loading of same data)=" + size);
|
||||
// Make a variably sized memcache.
|
||||
Memcache memcache2 = new Memcache();
|
||||
for (int i = 0; i < count; i++) {
|
||||
byte [] b = Bytes.toBytes(i);
|
||||
size += memcache2.add(new HStoreKey(b, b),
|
||||
new byte [i]);
|
||||
}
|
||||
LOG.info("memcache2 estimated size=" + size);
|
||||
final int seconds = 30;
|
||||
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
|
||||
for (int i = 0; i < seconds; i++) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
LOG.info("Exiting.");
|
||||
}
|
||||
}
|
|
@ -38,6 +38,13 @@ public class Bytes {
|
|||
*/
|
||||
public static final int SIZEOF_DOUBLE = Double.SIZE/Byte.SIZE;
|
||||
|
||||
/**
|
||||
* Estimate of size cost to pay beyond payload in jvm for instance of byte [].
|
||||
* Estimate based on study of jhat and jprofiler numbers.
|
||||
*/
|
||||
// JHat says BU is 56 bytes.
|
||||
public static final int ESTIMATED_HEAP_TAX = 16;
|
||||
|
||||
/**
|
||||
* Pass this to TreeMaps where byte [] are keys.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue