HBASE-900 Regionserver memory leak causing OOME during relatively modest bulk importing; part 4 (final part)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@728111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
22d20e92fb
commit
d63c9ce540
|
@ -924,15 +924,17 @@ public class HConnectionManager implements HConstants {
|
|||
currentRegion, batchUpdate.getRow(),
|
||||
tries, new ArrayList<Throwable>());
|
||||
}
|
||||
long sleepTime = getPauseTime(tries);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because region " +
|
||||
"server didn't accept updates ");
|
||||
LOG.debug("Eeloading table servers because region " +
|
||||
"server didn't accept updates; tries=" + tries +
|
||||
" of max=" + this.numRetries + ", waiting=" + sleepTime + "ms");
|
||||
}
|
||||
// Basic waiting time. If many updates are flushed, tests have shown
|
||||
// that this is barely needed but when commiting 1 update this may
|
||||
// get retried hundreds of times.
|
||||
try {
|
||||
Thread.sleep(getPauseTime(tries));
|
||||
Thread.sleep(sleepTime);
|
||||
tries++;
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
|
|
|
@ -21,7 +21,10 @@ package org.apache.hadoop.hbase.io;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -33,18 +36,31 @@ import org.apache.hadoop.io.DataInputBuffer;
|
|||
|
||||
/**
|
||||
* An implementation of {@link FSInputStream} that reads the stream in blocks
|
||||
* of a fixed, configurable size. The blocks are stored in a memory-sensitive cache.
|
||||
* of a fixed, configurable size. The blocks are stored in a memory-sensitive
|
||||
* cache. Implements Runnable. Run it on a period so we clean up soft
|
||||
* references from the reference queue.
|
||||
*/
|
||||
public class BlockFSInputStream extends FSInputStream {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(BlockFSInputStream.class);
|
||||
/*
|
||||
* Set up scheduled execution of cleanup of soft references. Run with one
|
||||
* thread for now. May need more when many files. Should be an option but
|
||||
* also want BlockFSInputStream to be self-contained.
|
||||
*/
|
||||
private static final ScheduledExecutorService EXECUTOR =
|
||||
Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
/*
|
||||
* The registration of this object in EXECUTOR.
|
||||
*/
|
||||
private final ScheduledFuture<?> registration;
|
||||
|
||||
private final InputStream in;
|
||||
|
||||
private final long fileLength;
|
||||
|
||||
private final int blockSize;
|
||||
private final Map<Long, byte[]> blocks;
|
||||
private final SoftValueMap<Long, byte[]> blocks;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
|
@ -84,6 +100,15 @@ public class BlockFSInputStream extends FSInputStream {
|
|||
return value;
|
||||
}
|
||||
};
|
||||
// Register a Runnable that runs checkReferences on a period.
|
||||
this.registration = EXECUTOR.scheduleAtFixedRate(new Runnable() {
|
||||
public void run() {
|
||||
int cleared = checkReferences();
|
||||
if (LOG.isDebugEnabled() && cleared > 0) {
|
||||
LOG.debug("Cleared " + cleared);
|
||||
}
|
||||
}
|
||||
}, 10, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -176,6 +201,9 @@ public class BlockFSInputStream extends FSInputStream {
|
|||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
if (!this.registration.cancel(false)) {
|
||||
LOG.warn("Failed cancel of " + this.registration);
|
||||
}
|
||||
if (blockStream != null) {
|
||||
blockStream.close();
|
||||
blockStream = null;
|
||||
|
@ -203,4 +231,14 @@ public class BlockFSInputStream extends FSInputStream {
|
|||
throw new IOException("Mark not supported");
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
* Call frequently to clear Soft Reference Reference Queue.
|
||||
* @return Count of references cleared.
|
||||
*/
|
||||
public synchronized int checkReferences() {
|
||||
if (closed || this.blocks == null) {
|
||||
return 0;
|
||||
}
|
||||
return this.blocks.checkReferences();
|
||||
}
|
||||
}
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Hbase customizations of MapFile.
|
||||
* HBase customizations of MapFile.
|
||||
*/
|
||||
public class HBaseMapFile extends MapFile {
|
||||
private static final Log LOG = LogFactory.getLog(HBaseMapFile.class);
|
||||
|
@ -132,4 +132,4 @@ public class HBaseMapFile extends MapFile {
|
|||
setIndexInterval(conf.getInt("hbase.io.index.interval", 32));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* java.lang.ref.ReferenceQueue utility class.
|
||||
* @param <K>
|
||||
* @param <V>
|
||||
*/
|
||||
class ReferenceQueueUtil<K,V> {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||
private final ReferenceQueue rq = new ReferenceQueue();
|
||||
private final Map<K,V> map;
|
||||
|
||||
private ReferenceQueueUtil() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
ReferenceQueueUtil(final Map<K,V> m) {
|
||||
super();
|
||||
this.map = m;
|
||||
}
|
||||
|
||||
public ReferenceQueue getReferenceQueue() {
|
||||
return rq;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the reference queue and delete anything that has since gone away
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
void checkReferences() {
|
||||
int i = 0;
|
||||
for (Object obj = null; (obj = this.rq.poll()) != null;) {
|
||||
i++;
|
||||
this.map.remove(((SoftValue<K,V>)obj).getKey());
|
||||
}
|
||||
if (i > 0 && LOG.isDebugEnabled()) {
|
||||
LOG.debug("" + i + " reference(s) cleared.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -35,13 +36,34 @@ import java.util.Set;
|
|||
public class SoftValueMap<K,V> implements Map<K,V> {
|
||||
private final Map<K, SoftValue<K,V>> internalMap =
|
||||
new HashMap<K, SoftValue<K,V>>();
|
||||
private final ReferenceQueueUtil<K,SoftValue<K,V>> rq =
|
||||
new ReferenceQueueUtil<K,SoftValue<K,V>>(this.internalMap);
|
||||
private final ReferenceQueue<?> rq;
|
||||
|
||||
public SoftValueMap() {
|
||||
this(new ReferenceQueue());
|
||||
}
|
||||
|
||||
public SoftValueMap(final ReferenceQueue<?> rq) {
|
||||
this.rq = rq;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks soft references and cleans any that have been placed on
|
||||
* ReferenceQueue.
|
||||
* @return How many references cleared.
|
||||
*/
|
||||
public int checkReferences() {
|
||||
int i = 0;
|
||||
for (Object obj = null; (obj = this.rq.poll()) != null;) {
|
||||
i++;
|
||||
this.internalMap.remove(((SoftValue<K,V>)obj).getKey());
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
public V put(K key, V value) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
SoftValue<K,V> oldValue = this.internalMap.put(key,
|
||||
new SoftValue<K,V>(key, value, this.rq.getReferenceQueue()));
|
||||
new SoftValue<K,V>(key, value, this.rq));
|
||||
return oldValue == null ? null : oldValue.get();
|
||||
}
|
||||
|
||||
|
@ -51,7 +73,7 @@ public class SoftValueMap<K,V> implements Map<K,V> {
|
|||
}
|
||||
|
||||
public V get(Object key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
SoftValue<K,V> value = this.internalMap.get(key);
|
||||
if (value == null) {
|
||||
return null;
|
||||
|
@ -64,13 +86,13 @@ public class SoftValueMap<K,V> implements Map<K,V> {
|
|||
}
|
||||
|
||||
public V remove(Object key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
SoftValue<K,V> value = this.internalMap.remove(key);
|
||||
return value == null ? null : value.get();
|
||||
}
|
||||
|
||||
public boolean containsKey(Object key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.containsKey(key);
|
||||
}
|
||||
|
||||
|
@ -81,27 +103,27 @@ public class SoftValueMap<K,V> implements Map<K,V> {
|
|||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.isEmpty();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.size();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
this.internalMap.clear();
|
||||
}
|
||||
|
||||
public Set<K> keySet() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.keySet();
|
||||
}
|
||||
|
||||
public Set<Map.Entry<K,V>> entrySet() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
Set<Map.Entry<K, SoftValue<K,V>>> entries = this.internalMap.entrySet();
|
||||
Set<Map.Entry<K, V>> real_entries = new HashSet<Map.Entry<K,V>>();
|
||||
for(Map.Entry<K, SoftValue<K,V>> entry : entries) {
|
||||
|
@ -111,7 +133,7 @@ public class SoftValueMap<K,V> implements Map<K,V> {
|
|||
}
|
||||
|
||||
public Collection<V> values() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
Collection<SoftValue<K,V>> softValues = this.internalMap.values();
|
||||
ArrayList<V> hardValues = new ArrayList<V>();
|
||||
for(SoftValue<K,V> softValue : softValues) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
|
@ -38,7 +39,7 @@ import java.util.TreeSet;
|
|||
*/
|
||||
public class SoftValueSortedMap<K,V> implements SortedMap<K,V> {
|
||||
private final SortedMap<K, SoftValue<K,V>> internalMap;
|
||||
private final ReferenceQueueUtil<K,SoftValue<K,V>> rq;
|
||||
private final ReferenceQueue rq = new ReferenceQueue();
|
||||
|
||||
/** Constructor */
|
||||
public SoftValueSortedMap() {
|
||||
|
@ -56,13 +57,27 @@ public class SoftValueSortedMap<K,V> implements SortedMap<K,V> {
|
|||
/** For headMap and tailMap support */
|
||||
private SoftValueSortedMap(SortedMap<K,SoftValue<K,V>> original) {
|
||||
this.internalMap = original;
|
||||
this.rq = new ReferenceQueueUtil<K,SoftValue<K,V>>(this.internalMap);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Checks soft references and cleans any that have been placed on
|
||||
* ReferenceQueue. Call if get/put etc. are not called regularly.
|
||||
* Internally these call checkReferences on each access.
|
||||
* @return How many references cleared.
|
||||
*/
|
||||
public int checkReferences() {
|
||||
int i = 0;
|
||||
for (Object obj = null; (obj = this.rq.poll()) != null;) {
|
||||
i++;
|
||||
this.internalMap.remove(((SoftValue<K,V>)obj).getKey());
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
public V put(K key, V value) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
SoftValue<K,V> oldValue = this.internalMap.put(key,
|
||||
new SoftValue<K,V>(key, value, this.rq.getReferenceQueue()));
|
||||
new SoftValue<K,V>(key, value, this.rq));
|
||||
return oldValue == null ? null : oldValue.get();
|
||||
}
|
||||
|
||||
|
@ -72,7 +87,7 @@ public class SoftValueSortedMap<K,V> implements SortedMap<K,V> {
|
|||
}
|
||||
|
||||
public V get(Object key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
SoftValue<K,V> value = this.internalMap.get(key);
|
||||
if (value == null) {
|
||||
return null;
|
||||
|
@ -85,13 +100,13 @@ public class SoftValueSortedMap<K,V> implements SortedMap<K,V> {
|
|||
}
|
||||
|
||||
public V remove(Object key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
SoftValue<K,V> value = this.internalMap.remove(key);
|
||||
return value == null ? null : value.get();
|
||||
}
|
||||
|
||||
public boolean containsKey(Object key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.containsKey(key);
|
||||
}
|
||||
|
||||
|
@ -102,47 +117,47 @@ public class SoftValueSortedMap<K,V> implements SortedMap<K,V> {
|
|||
}
|
||||
|
||||
public K firstKey() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return internalMap.firstKey();
|
||||
}
|
||||
|
||||
public K lastKey() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return internalMap.lastKey();
|
||||
}
|
||||
|
||||
public SoftValueSortedMap<K,V> headMap(K key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return new SoftValueSortedMap<K,V>(this.internalMap.headMap(key));
|
||||
}
|
||||
|
||||
public SoftValueSortedMap<K,V> tailMap(K key) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return new SoftValueSortedMap<K,V>(this.internalMap.tailMap(key));
|
||||
}
|
||||
|
||||
public SoftValueSortedMap<K,V> subMap(K fromKey, K toKey) {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return new SoftValueSortedMap<K,V>(this.internalMap.subMap(fromKey, toKey));
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.isEmpty();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.size();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
this.internalMap.clear();
|
||||
}
|
||||
|
||||
public Set<K> keySet() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
return this.internalMap.keySet();
|
||||
}
|
||||
|
||||
|
@ -152,7 +167,7 @@ public class SoftValueSortedMap<K,V> implements SortedMap<K,V> {
|
|||
}
|
||||
|
||||
public Set<Map.Entry<K,V>> entrySet() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
Set<Map.Entry<K, SoftValue<K,V>>> entries = this.internalMap.entrySet();
|
||||
Set<Map.Entry<K, V>> real_entries = new TreeSet<Map.Entry<K,V>>();
|
||||
for(Map.Entry<K, SoftValue<K,V>> entry : entries) {
|
||||
|
@ -162,7 +177,7 @@ public class SoftValueSortedMap<K,V> implements SortedMap<K,V> {
|
|||
}
|
||||
|
||||
public Collection<V> values() {
|
||||
this.rq.checkReferences();
|
||||
checkReferences();
|
||||
Collection<SoftValue<K,V>> softValues = this.internalMap.values();
|
||||
ArrayList<V> hardValues = new ArrayList<V>();
|
||||
for(SoftValue<K,V> softValue : softValues) {
|
||||
|
|
Loading…
Reference in New Issue