HBASE-5541 Avoid holding the rowlock during HLog sync in HRegion.mutateRowWithLocks
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1298678 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7ce77fc5d3
commit
5d9f98e09d
|
@ -4226,6 +4226,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
|
||||
long txid = 0;
|
||||
boolean walSyncSuccessful = false;
|
||||
boolean locked = false;
|
||||
|
||||
// 2. acquire the row lock(s)
|
||||
acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
|
||||
for (byte[] row : rowsToLock) {
|
||||
|
@ -4240,6 +4244,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// 3. acquire the region lock
|
||||
this.updatesLock.readLock().lock();
|
||||
locked = true;
|
||||
|
||||
// 4. Get a mvcc write number
|
||||
MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
|
||||
|
@ -4268,10 +4273,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
|
||||
// 6. append/sync all edits at once
|
||||
// TODO: Do batching as in doMiniBatchPut
|
||||
this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit,
|
||||
HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
|
||||
// 6. append all edits at once (don't sync)
|
||||
if (walEdit.size() > 0) {
|
||||
txid = this.log.appendNoSync(regionInfo,
|
||||
this.htableDescriptor.getName(), walEdit,
|
||||
HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
|
||||
}
|
||||
|
||||
// 7. apply to memstore
|
||||
long addedSize = 0;
|
||||
|
@ -4279,32 +4286,79 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
|
||||
}
|
||||
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
|
||||
} finally {
|
||||
// 8. roll mvcc forward
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
|
||||
// 9. release region lock
|
||||
// 8. release region and row lock(s)
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
// 10. run all coprocessor post hooks, after region lock is released
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
|
||||
} else if (m instanceof Delete) {
|
||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
|
||||
locked = false;
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
acquiredLocks = null;
|
||||
}
|
||||
|
||||
// 9. sync WAL if required
|
||||
if (walEdit.size() > 0 &&
|
||||
(this.regionInfo.isMetaRegion() ||
|
||||
!this.htableDescriptor.isDeferredLogFlush())) {
|
||||
this.log.sync(txid);
|
||||
}
|
||||
walSyncSuccessful = true;
|
||||
|
||||
// 10. advance mvcc
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
w = null;
|
||||
|
||||
// 11. run coprocessor post host hooks
|
||||
// after the WAL is sync'ed and all locks are released
|
||||
// (similar to doMiniBatchPut)
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
|
||||
} else if (m instanceof Delete) {
|
||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// 12. clean up if needed
|
||||
if (!walSyncSuccessful) {
|
||||
int kvsRolledback = 0;
|
||||
for (Mutation m : mutations) {
|
||||
for (Map.Entry<byte[], List<KeyValue>> e : m.getFamilyMap()
|
||||
.entrySet()) {
|
||||
List<KeyValue> kvs = e.getValue();
|
||||
byte[] family = e.getKey();
|
||||
Store store = getStore(family);
|
||||
// roll back each kv
|
||||
for (KeyValue kv : kvs) {
|
||||
store.rollback(kv);
|
||||
kvsRolledback++;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback
|
||||
+ " KeyValues");
|
||||
}
|
||||
|
||||
if (w != null) {
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
}
|
||||
|
||||
if (locked) {
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (acquiredLocks != null) {
|
||||
// 11. release the row lock
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
if (flush) {
|
||||
// 12. Flush cache if needed. Do it outside update lock.
|
||||
// 13. Flush cache if needed. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
closeRegionOperation();
|
||||
|
|
Loading…
Reference in New Issue