diff --git a/CHANGES.txt b/CHANGES.txt index 5c1f773b3a0..4af03314d05 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -30,6 +30,8 @@ Release 0.91.0 - Unreleased HBASE-3456 Fix hardcoding of 20 second socket timeout down in HBaseClient HBASE-3476 HFile -m option need not scan key values (Prakash Khemani via Lars George) + HBASE-2481 max seq id in flushed file can be larger than its correct value + causing data loss during recovery IMPROVEMENTS diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ea3046bbd46..c760c1777ad 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1481,6 +1481,7 @@ public class HRegion implements HeapSize { // , Writable{ long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); + boolean locked = false; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); @@ -1547,6 +1548,10 @@ public class HRegion implements HeapSize { // , Writable{ byteNow); } + + this.updatesLock.readLock().lock(); + locked = true; + // ------------------------------------ // STEP 3. Write to WAL // ---------------------------------- @@ -1589,6 +1594,9 @@ public class HRegion implements HeapSize { // , Writable{ success = true; return addedSize; } finally { + if (locked) + this.updatesLock.readLock().unlock(); + for (Integer toRelease : acquiredLocks) { releaseRowLock(toRelease); } @@ -3156,6 +3164,7 @@ public class HRegion implements HeapSize { // , Writable{ startRegionOperation(); try { Integer lid = getLock(lockid, row, true); + this.updatesLock.readLock().lock(); try { // Process each family for (Map.Entry> family : @@ -3211,6 +3220,7 @@ public class HRegion implements HeapSize { // , Writable{ size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { + this.updatesLock.readLock().unlock(); releaseRowLock(lid); } } finally { @@ -3244,6 +3254,7 @@ public class HRegion implements HeapSize { // , Writable{ startRegionOperation(); try { Integer lid = obtainRowLock(row); + this.updatesLock.readLock().lock(); try { Store store = stores.get(family); @@ -3284,6 +3295,7 @@ public class HRegion implements HeapSize { // , Writable{ size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { + this.updatesLock.readLock().unlock(); releaseRowLock(lid); } } finally {