diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c0b0fa83f28..8208abfafc6 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4226,6 +4226,10 @@ public class HRegion implements HeapSize { // , Writable{ } } + long txid = 0; + boolean walSyncSuccessful = false; + boolean locked = false; + // 2. acquire the row lock(s) acquiredLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { @@ -4240,6 +4244,7 @@ public class HRegion implements HeapSize { // , Writable{ // 3. acquire the region lock this.updatesLock.readLock().lock(); + locked = true; // 4. Get a mvcc write number MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert(); @@ -4268,10 +4273,12 @@ public class HRegion implements HeapSize { // , Writable{ } } - // 6. append/sync all edits at once - // TODO: Do batching as in doMiniBatchPut - this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, - HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + // 6. append all edits at once (don't sync) + if (walEdit.size() > 0) { + txid = this.log.appendNoSync(regionInfo, + this.htableDescriptor.getName(), walEdit, + HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + } // 7. apply to memstore long addedSize = 0; @@ -4279,32 +4286,79 @@ public class HRegion implements HeapSize { // , Writable{ addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w); } flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - } finally { - // 8. roll mvcc forward - mvcc.completeMemstoreInsert(w); - // 9. release region lock + // 8. release region and row lock(s) this.updatesLock.readLock().unlock(); - } - // 10. run all coprocessor post hooks, after region lock is released - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); - } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + locked = false; + if (acquiredLocks != null) { + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); + } + acquiredLocks = null; + } + + // 9. sync WAL if required + if (walEdit.size() > 0 && + (this.regionInfo.isMetaRegion() || + !this.htableDescriptor.isDeferredLogFlush())) { + this.log.sync(txid); + } + walSyncSuccessful = true; + + // 10. advance mvcc + mvcc.completeMemstoreInsert(w); + w = null; + + // 11. run coprocessor post host hooks + // after the WAL is sync'ed and all locks are released + // (similar to doMiniBatchPut) + if (coprocessorHost != null) { + for (Mutation m : mutations) { + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); + } else if (m instanceof Delete) { + coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + } + } + } + } finally { + // 12. clean up if needed + if (!walSyncSuccessful) { + int kvsRolledback = 0; + for (Mutation m : mutations) { + for (Map.Entry> e : m.getFamilyMap() + .entrySet()) { + List kvs = e.getValue(); + byte[] family = e.getKey(); + Store store = getStore(family); + // roll back each kv + for (KeyValue kv : kvs) { + store.rollback(kv); + kvsRolledback++; + } + } + } + LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback + + " KeyValues"); + } + + if (w != null) { + mvcc.completeMemstoreInsert(w); + } + + if (locked) { + this.updatesLock.readLock().unlock(); + } + + if (acquiredLocks != null) { + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); } } } } finally { - if (acquiredLocks != null) { - // 11. release the row lock - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - } if (flush) { - // 12. Flush cache if needed. Do it outside update lock. + // 13. Flush cache if needed. Do it outside update lock. requestFlush(); } closeRegionOperation();