HADOOP-1888 NullPointerException in HMemcacheScanner (reprise)
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@575986 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b271048e2f
commit
20a6c00b5c
|
@ -47,7 +47,7 @@ Trunk (unreleased changes)
|
|||
HADOOP-1847 Many HBase tests do not fail well. (phase 2)
|
||||
HADOOP-1870 Once file system failure has been detected, don't check it again
|
||||
and get on with shutting down the hbase cluster.
|
||||
HADOOP-1888 NullPointerException in HMemcacheScanner
|
||||
HADOOP-1888 NullPointerException in HMemcacheScanner (reprise)
|
||||
HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
|
||||
to disk.
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -43,10 +44,10 @@ public class HMemcache {
|
|||
// Note that since these structures are always accessed with a lock held,
|
||||
// no additional synchronization is required.
|
||||
|
||||
TreeMap<HStoreKey, byte []> memcache = new TreeMap<HStoreKey, byte []>();
|
||||
final ArrayList<TreeMap<HStoreKey, byte []>> history =
|
||||
new ArrayList<TreeMap<HStoreKey, byte []>>();
|
||||
TreeMap<HStoreKey, byte []> snapshot = null;
|
||||
volatile SortedMap<HStoreKey, byte []> memcache;
|
||||
List<SortedMap<HStoreKey, byte []>> history =
|
||||
Collections.synchronizedList(new ArrayList<SortedMap<HStoreKey, byte []>>());
|
||||
volatile SortedMap<HStoreKey, byte []> snapshot = null;
|
||||
|
||||
final HLocking lock = new HLocking();
|
||||
|
||||
|
@ -62,14 +63,16 @@ public class HMemcache {
|
|||
*/
|
||||
public HMemcache() {
|
||||
super();
|
||||
memcache =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
}
|
||||
|
||||
/** represents the state of the memcache at a specified point in time */
|
||||
static class Snapshot {
|
||||
final TreeMap<HStoreKey, byte []> memcacheSnapshot;
|
||||
final SortedMap<HStoreKey, byte []> memcacheSnapshot;
|
||||
final long sequenceId;
|
||||
|
||||
Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) {
|
||||
Snapshot(final SortedMap<HStoreKey, byte[]> memcache, final Long i) {
|
||||
super();
|
||||
this.memcacheSnapshot = memcache;
|
||||
this.sequenceId = i.longValue();
|
||||
|
@ -103,8 +106,11 @@ public class HMemcache {
|
|||
new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
|
||||
// From here on, any failure is catastrophic requiring replay of hlog
|
||||
this.snapshot = memcache;
|
||||
history.add(memcache);
|
||||
memcache = new TreeMap<HStoreKey, byte []>();
|
||||
synchronized (history) {
|
||||
history.add(memcache);
|
||||
}
|
||||
memcache =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
// Reset size of this memcache.
|
||||
this.size.set(0);
|
||||
return retval;
|
||||
|
@ -126,14 +132,8 @@ public class HMemcache {
|
|||
if(snapshot == null) {
|
||||
throw new IOException("Snapshot not present!");
|
||||
}
|
||||
for (Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator();
|
||||
it.hasNext(); ) {
|
||||
|
||||
TreeMap<HStoreKey, byte []> cur = it.next();
|
||||
if (snapshot == cur) {
|
||||
it.remove();
|
||||
break;
|
||||
}
|
||||
synchronized (history) {
|
||||
history.remove(snapshot);
|
||||
}
|
||||
this.snapshot = null;
|
||||
} finally {
|
||||
|
@ -182,12 +182,14 @@ public class HMemcache {
|
|||
this.lock.obtainReadLock();
|
||||
try {
|
||||
ArrayList<byte []> results = get(memcache, key, numVersions);
|
||||
for (int i = history.size() - 1; i >= 0; i--) {
|
||||
if (numVersions > 0 && results.size() >= numVersions) {
|
||||
break;
|
||||
synchronized (history) {
|
||||
for (int i = history.size() - 1; i >= 0; i--) {
|
||||
if (numVersions > 0 && results.size() >= numVersions) {
|
||||
break;
|
||||
}
|
||||
results.addAll(results.size(),
|
||||
get(history.get(i), key, numVersions - results.size()));
|
||||
}
|
||||
results.addAll(results.size(),
|
||||
get(history.get(i), key, numVersions - results.size()));
|
||||
}
|
||||
return (results.size() == 0) ? null :
|
||||
ImmutableBytesWritable.toArray(results);
|
||||
|
@ -210,9 +212,11 @@ public class HMemcache {
|
|||
this.lock.obtainReadLock();
|
||||
try {
|
||||
internalGetFull(memcache, key, results);
|
||||
for (int i = history.size() - 1; i >= 0; i--) {
|
||||
TreeMap<HStoreKey, byte []> cur = history.get(i);
|
||||
internalGetFull(cur, key, results);
|
||||
synchronized (history) {
|
||||
for (int i = history.size() - 1; i >= 0; i--) {
|
||||
SortedMap<HStoreKey, byte []> cur = history.get(i);
|
||||
internalGetFull(cur, key, results);
|
||||
}
|
||||
}
|
||||
return results;
|
||||
|
||||
|
@ -221,7 +225,7 @@ public class HMemcache {
|
|||
}
|
||||
}
|
||||
|
||||
void internalGetFull(TreeMap<HStoreKey, byte []> map, HStoreKey key,
|
||||
void internalGetFull(SortedMap<HStoreKey, byte []> map, HStoreKey key,
|
||||
TreeMap<Text, byte []> results) {
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
|
||||
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
|
||||
|
@ -252,7 +256,7 @@ public class HMemcache {
|
|||
* @return Ordered list of items found in passed <code>map</code>. If no
|
||||
* matching values, returns an empty list (does not return null).
|
||||
*/
|
||||
ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
|
||||
ArrayList<byte []> get(final SortedMap<HStoreKey, byte []> map,
|
||||
final HStoreKey key, final int numVersions) {
|
||||
ArrayList<byte []> result = new ArrayList<byte []>();
|
||||
// TODO: If get is of a particular version -- numVersions == 1 -- we
|
||||
|
@ -289,10 +293,12 @@ public class HMemcache {
|
|||
this.lock.obtainReadLock();
|
||||
try {
|
||||
List<HStoreKey> results = getKeys(this.memcache, origin, versions);
|
||||
for (int i = history.size() - 1; i >= 0; i--) {
|
||||
results.addAll(results.size(), getKeys(history.get(i), origin,
|
||||
versions == HConstants.ALL_VERSIONS ? versions :
|
||||
(versions - results.size())));
|
||||
synchronized (history) {
|
||||
for (int i = history.size() - 1; i >= 0; i--) {
|
||||
results.addAll(results.size(), getKeys(history.get(i), origin,
|
||||
versions == HConstants.ALL_VERSIONS ? versions :
|
||||
(versions - results.size())));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} finally {
|
||||
|
@ -308,7 +314,7 @@ public class HMemcache {
|
|||
* equal or older timestamp. If no keys, returns an empty List. Does not
|
||||
* return null.
|
||||
*/
|
||||
private List<HStoreKey> getKeys(final TreeMap<HStoreKey, byte []> map,
|
||||
private List<HStoreKey> getKeys(final SortedMap<HStoreKey, byte []> map,
|
||||
final HStoreKey origin, final int versions) {
|
||||
List<HStoreKey> result = new ArrayList<HStoreKey>();
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
|
||||
|
@ -360,7 +366,7 @@ public class HMemcache {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class HMemcacheScanner extends HAbstractScanner {
|
||||
final TreeMap<HStoreKey, byte []> backingMaps[];
|
||||
SortedMap<HStoreKey, byte []> backingMaps[];
|
||||
final Iterator<HStoreKey> keyIterators[];
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -370,14 +376,16 @@ public class HMemcache {
|
|||
super(timestamp, targetCols);
|
||||
lock.obtainReadLock();
|
||||
try {
|
||||
this.backingMaps = new TreeMap[history.size() + 1];
|
||||
synchronized (history) {
|
||||
this.backingMaps = new SortedMap[history.size() + 1];
|
||||
|
||||
// Note that since we iterate through the backing maps from 0 to n, we
|
||||
// need to put the memcache first, the newest history second, ..., etc.
|
||||
// Note that since we iterate through the backing maps from 0 to n, we
|
||||
// need to put the memcache first, the newest history second, ..., etc.
|
||||
|
||||
backingMaps[0] = memcache;
|
||||
for (int i = history.size() - 1; i > 0; i--) {
|
||||
backingMaps[i + 1] = history.get(i);
|
||||
backingMaps[0] = memcache;
|
||||
for (int i = history.size() - 1; i >= 0; i--) {
|
||||
backingMaps[i + 1] = history.get(i);
|
||||
}
|
||||
}
|
||||
|
||||
this.keyIterators = new Iterator[backingMaps.length];
|
||||
|
@ -388,9 +396,13 @@ public class HMemcache {
|
|||
|
||||
HStoreKey firstKey = new HStoreKey(firstRow);
|
||||
for (int i = 0; i < backingMaps.length; i++) {
|
||||
keyIterators[i] = firstRow.getLength() != 0 ?
|
||||
backingMaps[i].tailMap(firstKey).keySet().iterator() :
|
||||
backingMaps[i].keySet().iterator();
|
||||
if (firstRow != null && firstRow.getLength() != 0) {
|
||||
keyIterators[i] =
|
||||
backingMaps[i].tailMap(firstKey).keySet().iterator();
|
||||
|
||||
} else {
|
||||
keyIterators[i] = backingMaps[i].keySet().iterator();
|
||||
}
|
||||
|
||||
while (getNext(i)) {
|
||||
if (!findFirstRow(i, firstRow)) {
|
||||
|
|
|
@ -1615,6 +1615,7 @@ public class HRegion implements HConstants {
|
|||
return multipleMatchers;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
|
||||
throws IOException {
|
||||
// Filtered flag is set by filters. If a cell has been 'filtered out'
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Vector;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -439,13 +440,13 @@ class HStore implements HConstants {
|
|||
* @param logCacheFlushId flush sequence number
|
||||
* @throws IOException
|
||||
*/
|
||||
void flushCache(final TreeMap<HStoreKey, byte []> inputCache,
|
||||
void flushCache(final SortedMap<HStoreKey, byte []> inputCache,
|
||||
final long logCacheFlushId)
|
||||
throws IOException {
|
||||
flushCacheHelper(inputCache, logCacheFlushId, true);
|
||||
}
|
||||
|
||||
void flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
|
||||
void flushCacheHelper(SortedMap<HStoreKey, byte []> inputCache,
|
||||
long logCacheFlushId, boolean addToAvailableMaps)
|
||||
throws IOException {
|
||||
synchronized(flushLock) {
|
||||
|
@ -1123,7 +1124,7 @@ class HStore implements HConstants {
|
|||
* @param key
|
||||
* @param numVersions Number of versions to fetch. Must be > 0.
|
||||
* @param memcache Checked for deletions
|
||||
* @return
|
||||
* @return values for the specified versions
|
||||
* @throws IOException
|
||||
*/
|
||||
byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache)
|
||||
|
@ -1171,10 +1172,11 @@ class HStore implements HConstants {
|
|||
break;
|
||||
}
|
||||
}
|
||||
while ((readval = new ImmutableBytesWritable()) != null &&
|
||||
for (readval = new ImmutableBytesWritable();
|
||||
map.next(readkey, readval) &&
|
||||
readkey.matchesRowCol(key) &&
|
||||
!hasEnoughVersions(numVersions, results)) {
|
||||
!hasEnoughVersions(numVersions, results);
|
||||
readval = new ImmutableBytesWritable()) {
|
||||
if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
|
||||
results.add(readval.get());
|
||||
}
|
||||
|
@ -1212,10 +1214,11 @@ class HStore implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
List<HStoreKey> getKeys(final HStoreKey origin, List<HStoreKey> allKeys,
|
||||
final int versions)
|
||||
throws IOException {
|
||||
if (allKeys == null) {
|
||||
allKeys = new ArrayList<HStoreKey>();
|
||||
final int versions) throws IOException {
|
||||
|
||||
List<HStoreKey> keys = allKeys;
|
||||
if (keys == null) {
|
||||
keys = new ArrayList<HStoreKey>();
|
||||
}
|
||||
// This code below is very close to the body of the get method.
|
||||
this.lock.obtainReadLock();
|
||||
|
@ -1238,23 +1241,24 @@ class HStore implements HConstants {
|
|||
continue;
|
||||
}
|
||||
if (!isDeleted(readkey, readval.get(), null, null) &&
|
||||
!allKeys.contains(readkey)) {
|
||||
allKeys.add(new HStoreKey(readkey));
|
||||
!keys.contains(readkey)) {
|
||||
keys.add(new HStoreKey(readkey));
|
||||
}
|
||||
while ((readval = new ImmutableBytesWritable()) != null &&
|
||||
for (readval = new ImmutableBytesWritable();
|
||||
map.next(readkey, readval) &&
|
||||
readkey.matchesRowCol(origin)) {
|
||||
readkey.matchesRowCol(origin);
|
||||
readval = new ImmutableBytesWritable()) {
|
||||
if (!isDeleted(readkey, readval.get(), null, null) &&
|
||||
!allKeys.contains(readkey)) {
|
||||
allKeys.add(new HStoreKey(readkey));
|
||||
if (versions != ALL_VERSIONS && allKeys.size() >= versions) {
|
||||
!keys.contains(readkey)) {
|
||||
keys.add(new HStoreKey(readkey));
|
||||
if (versions != ALL_VERSIONS && keys.size() >= versions) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return allKeys;
|
||||
return keys;
|
||||
} finally {
|
||||
this.lock.releaseReadLock();
|
||||
}
|
||||
|
|
|
@ -50,7 +50,8 @@ public class MiniHBaseCluster implements HConstants {
|
|||
private FileSystem fs;
|
||||
private Path parentdir;
|
||||
private MasterThread masterThread = null;
|
||||
ArrayList<RegionServerThread> regionThreads;
|
||||
ArrayList<RegionServerThread> regionThreads =
|
||||
new ArrayList<RegionServerThread>();
|
||||
private boolean deleteOnExit = true;
|
||||
|
||||
/**
|
||||
|
@ -125,7 +126,7 @@ public class MiniHBaseCluster implements HConstants {
|
|||
this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
|
||||
fs.mkdirs(parentdir);
|
||||
this.masterThread = startMaster(this.conf);
|
||||
this.regionThreads = startRegionServers(this.conf, nRegionNodes);
|
||||
this.regionThreads.addAll(startRegionServers(this.conf, nRegionNodes));
|
||||
} catch(IOException e) {
|
||||
shutdown();
|
||||
throw e;
|
||||
|
@ -357,17 +358,15 @@ public class MiniHBaseCluster implements HConstants {
|
|||
if(masterThread != null) {
|
||||
masterThread.getMaster().shutdown();
|
||||
}
|
||||
if (regionServerThreads != null) {
|
||||
synchronized(regionServerThreads) {
|
||||
if (regionServerThreads != null) {
|
||||
for(Thread t: regionServerThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
// regionServerThreads can never be null because they are initialized when
|
||||
// the class is constructed.
|
||||
synchronized(regionServerThreads) {
|
||||
for(Thread t: regionServerThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -381,8 +380,7 @@ public class MiniHBaseCluster implements HConstants {
|
|||
}
|
||||
LOG.info("Shutdown " +
|
||||
((masterThread != null)? masterThread.getName(): "0 masters") + " " +
|
||||
((regionServerThreads == null)? 0: regionServerThreads.size()) +
|
||||
" region server(s)");
|
||||
regionServerThreads.size() + " region server(s)");
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
|
|||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
@ -97,7 +98,7 @@ public class TestHMemcache extends TestCase {
|
|||
|
||||
// Save off old state.
|
||||
int oldHistorySize = hmc.history.size();
|
||||
TreeMap<HStoreKey, byte []> oldMemcache = hmc.memcache;
|
||||
SortedMap<HStoreKey, byte []> oldMemcache = hmc.memcache;
|
||||
// Run snapshot.
|
||||
Snapshot s = hmc.snapshotMemcacheForLog(log);
|
||||
// Make some assertions about what just happened.
|
||||
|
|
Loading…
Reference in New Issue