diff --git a/CHANGES.txt b/CHANGES.txt index 42fa3f8290b..cf5f737b161 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -47,7 +47,7 @@ Trunk (unreleased changes) HADOOP-1847 Many HBase tests do not fail well. (phase 2) HADOOP-1870 Once file system failure has been detected, don't check it again and get on with shutting down the hbase cluster. - HADOOP-1888 NullPointerException in HMemcacheScanner + HADOOP-1888 NullPointerException in HMemcacheScanner (reprise) HADOOP-1903 Possible data loss if Exception happens between snapshot and flush to disk. diff --git a/src/java/org/apache/hadoop/hbase/HMemcache.java b/src/java/org/apache/hadoop/hbase/HMemcache.java index baa2a5e9840..852582733d7 100644 --- a/src/java/org/apache/hadoop/hbase/HMemcache.java +++ b/src/java/org/apache/hadoop/hbase/HMemcache.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -43,10 +44,10 @@ public class HMemcache { // Note that since these structures are always accessed with a lock held, // no additional synchronization is required. - TreeMap memcache = new TreeMap(); - final ArrayList> history = - new ArrayList>(); - TreeMap snapshot = null; + volatile SortedMap memcache; + List> history = + Collections.synchronizedList(new ArrayList>()); + volatile SortedMap snapshot = null; final HLocking lock = new HLocking(); @@ -62,14 +63,16 @@ public class HMemcache { */ public HMemcache() { super(); + memcache = + Collections.synchronizedSortedMap(new TreeMap()); } /** represents the state of the memcache at a specified point in time */ static class Snapshot { - final TreeMap memcacheSnapshot; + final SortedMap memcacheSnapshot; final long sequenceId; - Snapshot(final TreeMap memcache, final Long i) { + Snapshot(final SortedMap memcache, final Long i) { super(); this.memcacheSnapshot = memcache; this.sequenceId = i.longValue(); @@ -103,8 +106,11 @@ public class HMemcache { new Snapshot(memcache, Long.valueOf(log.startCacheFlush())); // From here on, any failure is catastrophic requiring replay of hlog this.snapshot = memcache; - history.add(memcache); - memcache = new TreeMap(); + synchronized (history) { + history.add(memcache); + } + memcache = + Collections.synchronizedSortedMap(new TreeMap()); // Reset size of this memcache. this.size.set(0); return retval; @@ -126,14 +132,8 @@ public class HMemcache { if(snapshot == null) { throw new IOException("Snapshot not present!"); } - for (Iterator> it = history.iterator(); - it.hasNext(); ) { - - TreeMap cur = it.next(); - if (snapshot == cur) { - it.remove(); - break; - } + synchronized (history) { + history.remove(snapshot); } this.snapshot = null; } finally { @@ -182,12 +182,14 @@ public class HMemcache { this.lock.obtainReadLock(); try { ArrayList results = get(memcache, key, numVersions); - for (int i = history.size() - 1; i >= 0; i--) { - if (numVersions > 0 && results.size() >= numVersions) { - break; + synchronized (history) { + for (int i = history.size() - 1; i >= 0; i--) { + if (numVersions > 0 && results.size() >= numVersions) { + break; + } + results.addAll(results.size(), + get(history.get(i), key, numVersions - results.size())); } - results.addAll(results.size(), - get(history.get(i), key, numVersions - results.size())); } return (results.size() == 0) ? null : ImmutableBytesWritable.toArray(results); @@ -210,9 +212,11 @@ public class HMemcache { this.lock.obtainReadLock(); try { internalGetFull(memcache, key, results); - for (int i = history.size() - 1; i >= 0; i--) { - TreeMap cur = history.get(i); - internalGetFull(cur, key, results); + synchronized (history) { + for (int i = history.size() - 1; i >= 0; i--) { + SortedMap cur = history.get(i); + internalGetFull(cur, key, results); + } } return results; @@ -221,7 +225,7 @@ public class HMemcache { } } - void internalGetFull(TreeMap map, HStoreKey key, + void internalGetFull(SortedMap map, HStoreKey key, TreeMap results) { SortedMap tailMap = map.tailMap(key); for (Map.Entry es: tailMap.entrySet()) { @@ -252,7 +256,7 @@ public class HMemcache { * @return Ordered list of items found in passed map. If no * matching values, returns an empty list (does not return null). */ - ArrayList get(final TreeMap map, + ArrayList get(final SortedMap map, final HStoreKey key, final int numVersions) { ArrayList result = new ArrayList(); // TODO: If get is of a particular version -- numVersions == 1 -- we @@ -289,10 +293,12 @@ public class HMemcache { this.lock.obtainReadLock(); try { List results = getKeys(this.memcache, origin, versions); - for (int i = history.size() - 1; i >= 0; i--) { - results.addAll(results.size(), getKeys(history.get(i), origin, - versions == HConstants.ALL_VERSIONS ? versions : - (versions - results.size()))); + synchronized (history) { + for (int i = history.size() - 1; i >= 0; i--) { + results.addAll(results.size(), getKeys(history.get(i), origin, + versions == HConstants.ALL_VERSIONS ? versions : + (versions - results.size()))); + } } return results; } finally { @@ -308,7 +314,7 @@ public class HMemcache { * equal or older timestamp. If no keys, returns an empty List. Does not * return null. */ - private List getKeys(final TreeMap map, + private List getKeys(final SortedMap map, final HStoreKey origin, final int versions) { List result = new ArrayList(); SortedMap tailMap = map.tailMap(origin); @@ -360,7 +366,7 @@ public class HMemcache { ////////////////////////////////////////////////////////////////////////////// class HMemcacheScanner extends HAbstractScanner { - final TreeMap backingMaps[]; + SortedMap backingMaps[]; final Iterator keyIterators[]; @SuppressWarnings("unchecked") @@ -370,14 +376,16 @@ public class HMemcache { super(timestamp, targetCols); lock.obtainReadLock(); try { - this.backingMaps = new TreeMap[history.size() + 1]; + synchronized (history) { + this.backingMaps = new SortedMap[history.size() + 1]; - // Note that since we iterate through the backing maps from 0 to n, we - // need to put the memcache first, the newest history second, ..., etc. + // Note that since we iterate through the backing maps from 0 to n, we + // need to put the memcache first, the newest history second, ..., etc. - backingMaps[0] = memcache; - for (int i = history.size() - 1; i > 0; i--) { - backingMaps[i + 1] = history.get(i); + backingMaps[0] = memcache; + for (int i = history.size() - 1; i >= 0; i--) { + backingMaps[i + 1] = history.get(i); + } } this.keyIterators = new Iterator[backingMaps.length]; @@ -388,9 +396,13 @@ public class HMemcache { HStoreKey firstKey = new HStoreKey(firstRow); for (int i = 0; i < backingMaps.length; i++) { - keyIterators[i] = firstRow.getLength() != 0 ? - backingMaps[i].tailMap(firstKey).keySet().iterator() : - backingMaps[i].keySet().iterator(); + if (firstRow != null && firstRow.getLength() != 0) { + keyIterators[i] = + backingMaps[i].tailMap(firstKey).keySet().iterator(); + + } else { + keyIterators[i] = backingMaps[i].keySet().iterator(); + } while (getNext(i)) { if (!findFirstRow(i, firstRow)) { diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index a278a95d98e..69911669920 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -1615,6 +1615,7 @@ public class HRegion implements HConstants { return multipleMatchers; } + /** {@inheritDoc} */ public boolean next(HStoreKey key, SortedMap results) throws IOException { // Filtered flag is set by filters. If a cell has been 'filtered out' diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index 5236bc2bbcd..cd88bf4e9d4 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; import java.util.Map.Entry; @@ -439,13 +440,13 @@ class HStore implements HConstants { * @param logCacheFlushId flush sequence number * @throws IOException */ - void flushCache(final TreeMap inputCache, + void flushCache(final SortedMap inputCache, final long logCacheFlushId) throws IOException { flushCacheHelper(inputCache, logCacheFlushId, true); } - void flushCacheHelper(TreeMap inputCache, + void flushCacheHelper(SortedMap inputCache, long logCacheFlushId, boolean addToAvailableMaps) throws IOException { synchronized(flushLock) { @@ -1123,7 +1124,7 @@ class HStore implements HConstants { * @param key * @param numVersions Number of versions to fetch. Must be > 0. * @param memcache Checked for deletions - * @return + * @return values for the specified versions * @throws IOException */ byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache) @@ -1171,10 +1172,11 @@ class HStore implements HConstants { break; } } - while ((readval = new ImmutableBytesWritable()) != null && + for (readval = new ImmutableBytesWritable(); map.next(readkey, readval) && readkey.matchesRowCol(key) && - !hasEnoughVersions(numVersions, results)) { + !hasEnoughVersions(numVersions, results); + readval = new ImmutableBytesWritable()) { if (!isDeleted(readkey, readval.get(), memcache, deletes)) { results.add(readval.get()); } @@ -1212,10 +1214,11 @@ class HStore implements HConstants { * @throws IOException */ List getKeys(final HStoreKey origin, List allKeys, - final int versions) - throws IOException { - if (allKeys == null) { - allKeys = new ArrayList(); + final int versions) throws IOException { + + List keys = allKeys; + if (keys == null) { + keys = new ArrayList(); } // This code below is very close to the body of the get method. this.lock.obtainReadLock(); @@ -1238,23 +1241,24 @@ class HStore implements HConstants { continue; } if (!isDeleted(readkey, readval.get(), null, null) && - !allKeys.contains(readkey)) { - allKeys.add(new HStoreKey(readkey)); + !keys.contains(readkey)) { + keys.add(new HStoreKey(readkey)); } - while ((readval = new ImmutableBytesWritable()) != null && + for (readval = new ImmutableBytesWritable(); map.next(readkey, readval) && - readkey.matchesRowCol(origin)) { + readkey.matchesRowCol(origin); + readval = new ImmutableBytesWritable()) { if (!isDeleted(readkey, readval.get(), null, null) && - !allKeys.contains(readkey)) { - allKeys.add(new HStoreKey(readkey)); - if (versions != ALL_VERSIONS && allKeys.size() >= versions) { + !keys.contains(readkey)) { + keys.add(new HStoreKey(readkey)); + if (versions != ALL_VERSIONS && keys.size() >= versions) { break; } } } } } - return allKeys; + return keys; } finally { this.lock.releaseReadLock(); } diff --git a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java index 93696c5e984..093d3594eb0 100644 --- a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -50,7 +50,8 @@ public class MiniHBaseCluster implements HConstants { private FileSystem fs; private Path parentdir; private MasterThread masterThread = null; - ArrayList regionThreads; + ArrayList regionThreads = + new ArrayList(); private boolean deleteOnExit = true; /** @@ -125,7 +126,7 @@ public class MiniHBaseCluster implements HConstants { this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); fs.mkdirs(parentdir); this.masterThread = startMaster(this.conf); - this.regionThreads = startRegionServers(this.conf, nRegionNodes); + this.regionThreads.addAll(startRegionServers(this.conf, nRegionNodes)); } catch(IOException e) { shutdown(); throw e; @@ -357,17 +358,15 @@ public class MiniHBaseCluster implements HConstants { if(masterThread != null) { masterThread.getMaster().shutdown(); } - if (regionServerThreads != null) { - synchronized(regionServerThreads) { - if (regionServerThreads != null) { - for(Thread t: regionServerThreads) { - if (t.isAlive()) { - try { - t.join(); - } catch (InterruptedException e) { - // continue - } - } + // regionServerThreads can never be null because they are initialized when + // the class is constructed. + synchronized(regionServerThreads) { + for(Thread t: regionServerThreads) { + if (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + // continue } } } @@ -381,8 +380,7 @@ public class MiniHBaseCluster implements HConstants { } LOG.info("Shutdown " + ((masterThread != null)? masterThread.getName(): "0 masters") + " " + - ((regionServerThreads == null)? 0: regionServerThreads.size()) + - " region server(s)"); + regionServerThreads.size() + " region server(s)"); } void shutdown() { diff --git a/src/test/org/apache/hadoop/hbase/TestHMemcache.java b/src/test/org/apache/hadoop/hbase/TestHMemcache.java index c66e09bca28..68c8338ef81 100644 --- a/src/test/org/apache/hadoop/hbase/TestHMemcache.java +++ b/src/test/org/apache/hadoop/hbase/TestHMemcache.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Map; +import java.util.SortedMap; import java.util.TreeMap; import junit.framework.TestCase; @@ -97,7 +98,7 @@ public class TestHMemcache extends TestCase { // Save off old state. int oldHistorySize = hmc.history.size(); - TreeMap oldMemcache = hmc.memcache; + SortedMap oldMemcache = hmc.memcache; // Run snapshot. Snapshot s = hmc.snapshotMemcacheForLog(log); // Make some assertions about what just happened.