HBASE-16698 Performance issue: handlers stuck waiting for CountDownLatch inside WALKey#getWriteEntry under high writing workload
This commit is contained in:
parent
33e89fa9cf
commit
a7a4e17f1d
@ -62,6 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
@ -206,6 +207,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
|
||||
"hbase.hregion.scan.loadColumnFamiliesOnDemand";
|
||||
|
||||
/** Config key for using mvcc pre-assign feature for put */
|
||||
public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign";
|
||||
public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true;
|
||||
|
||||
/**
|
||||
* Longest time we'll wait on a sequenceid.
|
||||
* Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use
|
||||
@ -604,6 +609,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
private final Durability durability;
|
||||
private final boolean regionStatsEnabled;
|
||||
|
||||
// flag and lock for MVCC preassign
|
||||
private final boolean mvccPreAssign;
|
||||
private final ReentrantLock preAssignMvccLock;
|
||||
|
||||
/**
|
||||
* HRegion constructor. This constructor should only be used for testing and
|
||||
* extensions. Instances of HRegion should be instantiated with the
|
||||
@ -753,6 +762,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
false :
|
||||
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
|
||||
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
|
||||
|
||||
// get mvcc pre-assign flag and lock
|
||||
this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN);
|
||||
if (this.mvccPreAssign) {
|
||||
this.preAssignMvccLock = new ReentrantLock();
|
||||
} else {
|
||||
this.preAssignMvccLock = null;
|
||||
}
|
||||
}
|
||||
|
||||
void setHTableSpecificConf() {
|
||||
@ -2576,7 +2593,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
// a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
|
||||
// Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
|
||||
// so if an abort or stop, there is no way to call them in.
|
||||
WALKey key = this.appendEmptyEdit(wal);
|
||||
WALKey key = this.appendEmptyEdit(wal, null);
|
||||
mvcc.complete(key.getWriteEntry());
|
||||
return key.getSequenceId(this.maxWaitForSeqId);
|
||||
}
|
||||
@ -3283,25 +3300,44 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
|
||||
long replaySeqId = batchOp.getReplaySequenceId();
|
||||
walKey.setOrigLogSeqNum(replaySeqId);
|
||||
}
|
||||
if (walEdit.size() > 0) {
|
||||
if (!isInReplay) {
|
||||
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
|
||||
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
|
||||
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
|
||||
if (walEdit.size() > 0) {
|
||||
txid =
|
||||
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
if (mvccPreAssign) {
|
||||
preAssignMvccLock.lock();
|
||||
writeEntry = mvcc.begin();
|
||||
}
|
||||
if (walEdit.size() > 0) {
|
||||
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
|
||||
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
|
||||
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
|
||||
if (mvccPreAssign) {
|
||||
walKey.setPreAssignedWriteEntry(writeEntry);
|
||||
}
|
||||
txid =
|
||||
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
|
||||
} else {
|
||||
// If this is a skip wal operation just get the read point from mvcc
|
||||
walKey = this.appendEmptyEdit(this.wal, writeEntry);
|
||||
}
|
||||
} finally {
|
||||
if (mvccPreAssign) {
|
||||
preAssignMvccLock.unlock();
|
||||
}
|
||||
}
|
||||
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
|
||||
}
|
||||
// ------------------------------------
|
||||
// Acquire the latest mvcc number
|
||||
// ----------------------------------
|
||||
if (walKey == null) {
|
||||
// If this is a skip wal operation just get the read point from mvcc
|
||||
walKey = this.appendEmptyEdit(this.wal);
|
||||
}
|
||||
if (!isInReplay) {
|
||||
writeEntry = walKey.getWriteEntry();
|
||||
if (writeEntry == null) {
|
||||
// we need to wait for mvcc to be assigned here if not preassigned
|
||||
writeEntry = walKey.getWriteEntry();
|
||||
}
|
||||
mvccNum = writeEntry.getWriteNumber();
|
||||
} else {
|
||||
mvccNum = batchOp.getReplaySequenceId();
|
||||
@ -3324,7 +3360,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
// We need to update the sequence id for following reasons.
|
||||
// 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
|
||||
// 2) If no WAL, FSWALEntry won't be used
|
||||
boolean updateSeqId = isInReplay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
|
||||
// 3) If mvcc preassigned, the asynchronous append may still hasn't run to
|
||||
// FSWALEntry#stampRegionSequenceId and the cell seqId will be 0. So we need to update
|
||||
// before apply to memstore to avoid scan return incorrect value
|
||||
boolean updateSeqId = isInReplay
|
||||
|| batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign;
|
||||
if (updateSeqId) {
|
||||
updateSequenceId(familyMaps[i].values(), mvccNum);
|
||||
}
|
||||
@ -7206,7 +7246,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
if(walKey == null){
|
||||
// since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
|
||||
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
|
||||
walKey = this.appendEmptyEdit(this.wal);
|
||||
walKey = this.appendEmptyEdit(this.wal, null);
|
||||
}
|
||||
|
||||
// 7. Start mvcc transaction
|
||||
@ -7531,7 +7571,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
boolean updateSeqId = false;
|
||||
if (walKey == null) {
|
||||
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
|
||||
walKey = this.appendEmptyEdit(this.wal);
|
||||
walKey = this.appendEmptyEdit(this.wal, null);
|
||||
// If no WAL, FSWALEntry won't be used and no update for sequence id
|
||||
updateSeqId = true;
|
||||
}
|
||||
@ -7770,7 +7810,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
|
||||
} else {
|
||||
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
|
||||
walKey = this.appendEmptyEdit(this.wal);
|
||||
walKey = this.appendEmptyEdit(this.wal, null);
|
||||
// If no WAL, FSWALEntry won't be used and no update for sequence id
|
||||
updateSeqId = true;
|
||||
}
|
||||
@ -7991,9 +8031,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
||||
47 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
||||
(14 * Bytes.SIZEOF_LONG) +
|
||||
5 * Bytes.SIZEOF_BOOLEAN);
|
||||
6 * Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
// woefully out of date - currently missing:
|
||||
// 1 x HashMap - coprocessorServiceHandlers
|
||||
@ -8575,15 +8615,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
* Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
|
||||
* the WALEdit append later.
|
||||
* @param wal
|
||||
* @param writeEntry Preassigned writeEntry, if any
|
||||
* @return Return the key used appending with no sync and no append.
|
||||
* @throws IOException
|
||||
*/
|
||||
private WALKey appendEmptyEdit(final WAL wal) throws IOException {
|
||||
private WALKey appendEmptyEdit(final WAL wal, WriteEntry writeEntry) throws IOException {
|
||||
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
|
||||
@SuppressWarnings("deprecation")
|
||||
WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
|
||||
getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
|
||||
HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
|
||||
if (writeEntry != null) {
|
||||
key.setPreAssignedWriteEntry(writeEntry);
|
||||
}
|
||||
|
||||
// Call append but with an empty WALEdit. The returned sequence id will not be associated
|
||||
// with any edit and we can be sure it went in after all outstanding appends.
|
||||
|
@ -111,11 +111,16 @@ class FSWALEntry extends Entry {
|
||||
*/
|
||||
long stampRegionSequenceId() throws IOException {
|
||||
long regionSequenceId = WALKey.NO_SEQUENCE_ID;
|
||||
MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
|
||||
MultiVersionConcurrencyControl.WriteEntry we = null;
|
||||
|
||||
if (mvcc != null) {
|
||||
we = mvcc.begin();
|
||||
WALKey key = getKey();
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
|
||||
boolean preAssigned = (we != null);
|
||||
if (!preAssigned) {
|
||||
MultiVersionConcurrencyControl mvcc = key.getMvcc();
|
||||
if (mvcc != null) {
|
||||
we = mvcc.begin();
|
||||
}
|
||||
}
|
||||
if (we != null) {
|
||||
regionSequenceId = we.getWriteNumber();
|
||||
}
|
||||
|
||||
@ -126,9 +131,9 @@ class FSWALEntry extends Entry {
|
||||
}
|
||||
|
||||
// This has to stay in this order
|
||||
WALKey key = getKey();
|
||||
key.setLogSeqNum(regionSequenceId);
|
||||
key.setWriteEntry(we);
|
||||
if (!preAssigned) {
|
||||
key.setWriteEntry(we);
|
||||
}
|
||||
return regionSequenceId;
|
||||
}
|
||||
|
||||
|
@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -94,6 +95,10 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
||||
*/
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
|
||||
if (this.preAssignedWriteEntry != null) {
|
||||
// don't wait for seqNumAssignedLatch if writeEntry is preassigned
|
||||
return this.preAssignedWriteEntry;
|
||||
}
|
||||
try {
|
||||
this.seqNumAssignedLatch.await();
|
||||
} catch (InterruptedException ie) {
|
||||
@ -114,7 +119,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
||||
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
|
||||
assert this.writeEntry == null : "Non-null writeEntry when trying to set one";
|
||||
this.writeEntry = writeEntry;
|
||||
// Set our sequenceid now using WriteEntry.
|
||||
if (this.writeEntry != null) {
|
||||
this.logSeqNum = this.writeEntry.getWriteNumber();
|
||||
}
|
||||
this.seqNumAssignedLatch.countDown();
|
||||
}
|
||||
|
||||
@ -196,6 +206,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
||||
private long nonce = HConstants.NO_NONCE;
|
||||
private MultiVersionConcurrencyControl mvcc;
|
||||
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
|
||||
private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null;
|
||||
public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
|
||||
|
||||
// visible for deprecated HLogKey
|
||||
@ -359,17 +370,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
||||
return this.logSeqNum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allow that the log sequence id to be set post-construction
|
||||
* Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry
|
||||
* @param sequence
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public void setLogSeqNum(final long sequence) {
|
||||
this.logSeqNum = sequence;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to set original seq Id for WALKey during wal replay
|
||||
* @param seqId
|
||||
@ -666,4 +666,24 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
||||
this.origLogSeqNum = walKey.getOrigSequenceNumber();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The preassigned writeEntry, if any
|
||||
*/
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() {
|
||||
return this.preAssignedWriteEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Preassign writeEntry
|
||||
* @param writeEntry the entry to assign
|
||||
*/
|
||||
@InterfaceAudience.Private // For internal use only.
|
||||
public void setPreAssignedWriteEntry(WriteEntry writeEntry) {
|
||||
if (writeEntry != null) {
|
||||
this.preAssignedWriteEntry = writeEntry;
|
||||
this.logSeqNum = writeEntry.getWriteNumber();
|
||||
}
|
||||
}
|
||||
}
|
@ -6200,8 +6200,11 @@ public class TestHRegion {
|
||||
@Override
|
||||
public Long answer(InvocationOnMock invocation) throws Throwable {
|
||||
WALKey key = invocation.getArgumentAt(2, WALKey.class);
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
|
||||
key.setWriteEntry(we);
|
||||
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
|
||||
if (we == null) {
|
||||
we = key.getMvcc().begin();
|
||||
key.setWriteEntry(we);
|
||||
}
|
||||
return 1L;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user