Revert "HBASE-16698 Performance issue: handlers stuck waiting for CountDownLatch inside WALKey#getWriteEntry under high writing workload"

Premature commit. Revert while discussion still ongoing.

This reverts commit 9b13514483.
This commit is contained in:
Michael Stack 2016-10-14 11:27:05 -07:00
parent f555b5be9c
commit 13baf4d37a
4 changed files with 28 additions and 107 deletions

View File

@ -64,7 +64,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -198,10 +197,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
"hbase.hregion.scan.loadColumnFamiliesOnDemand"; "hbase.hregion.scan.loadColumnFamiliesOnDemand";
/** Config key for using mvcc pre-assign feature for put */
public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign";
public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true;
/** /**
* This is the global default value for durability. All tables/mutations not * This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value. * defining a durability or using USE_DEFAULT will default to this value.
@ -590,9 +585,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// that has non-default scope // that has non-default scope
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<byte[], Integer>( private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR); Bytes.BYTES_COMPARATOR);
// flag and lock for MVCC preassign
private final boolean mvccPreAssign;
private final ReentrantLock preAssignMvccLock;
/** /**
* HRegion constructor. This constructor should only be used for testing and * HRegion constructor. This constructor should only be used for testing and
@ -752,14 +744,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
false : false :
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
// get mvcc pre-assign flag and lock
this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN);
if (this.mvccPreAssign) {
this.preAssignMvccLock = new ReentrantLock();
} else {
this.preAssignMvccLock = null;
}
} }
void setHTableSpecificConf() { void setHTableSpecificConf() {
@ -3230,61 +3214,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 4. Append the final edit to WAL and sync. // STEP 4. Append the final edit to WAL and sync.
Mutation mutation = batchOp.getMutation(firstIndex); Mutation mutation = batchOp.getMutation(firstIndex);
WALKey walKey = null; WALKey walKey = null;
long txid;
if (replay) { if (replay) {
// use wal key from the original // use wal key from the original
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
if (!walEdit.isEmpty()) { }
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); // Not sure what is going on here when replay is going on... does the below append get
if (txid != 0) { // called for replayed edits? Am afraid to change it without test.
sync(txid, durability); if (!walEdit.isEmpty()) {
} if (!replay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
this.getReplicationScope());
} }
} else { // TODO: Use the doAppend methods below... complicated by the replay stuff above.
try { try {
if (!walEdit.isEmpty()) { long txid = this.wal.append(this.getRegionInfo(), walKey,
try { walEdit, true);
if (this.mvccPreAssign) { if (txid != 0) sync(txid, durability);
preAssignMvccLock.lock(); writeEntry = walKey.getWriteEntry();
writeEntry = mvcc.begin();
}
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
this.getReplicationScope());
if (this.mvccPreAssign) {
walKey.setPreAssignedWriteEntry(writeEntry);
}
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
} finally {
if (mvccPreAssign) {
preAssignMvccLock.unlock();
}
}
if (txid != 0) {
sync(txid, durability);
}
if (writeEntry == null) {
// if MVCC not preassigned, wait here until assigned
writeEntry = walKey.getWriteEntry();
}
}
} catch (IOException ioe) { } catch (IOException ioe) {
if (walKey != null && writeEntry == null) { if (walKey != null) mvcc.complete(walKey.getWriteEntry());
// the writeEntry is not preassigned and error occurred during append or sync
mvcc.complete(walKey.getWriteEntry());
}
throw ioe; throw ioe;
} }
} }
if (walKey == null) { if (walKey == null) {
// If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid.
// to get sequence id.
writeEntry = mvcc.begin(); writeEntry = mvcc.begin();
} }
@ -3307,9 +3266,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 6. Complete mvcc. // STEP 6. Complete mvcc.
if (replay) { if (replay) {
this.mvcc.advanceTo(batchOp.getReplaySequenceId()); this.mvcc.advanceTo(batchOp.getReplaySequenceId());
} else { } else if (writeEntry != null/*Can be null if in replay mode*/) {
// writeEntry won't be empty if not in replay mode
assert writeEntry != null;
mvcc.completeAndWait(writeEntry); mvcc.completeAndWait(writeEntry);
writeEntry = null; writeEntry = null;
} }
@ -7637,9 +7594,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + ClassSize.OBJECT +
ClassSize.ARRAY + ClassSize.ARRAY +
50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + 49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) + (14 * Bytes.SIZEOF_LONG) +
6 * Bytes.SIZEOF_BOOLEAN); 5 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing: // woefully out of date - currently missing:
// 1 x HashMap - coprocessorServiceHandlers // 1 x HashMap - coprocessorServiceHandlers

View File

@ -112,16 +112,11 @@ class FSWALEntry extends Entry {
} }
stamped = true; stamped = true;
long regionSequenceId = WALKey.NO_SEQUENCE_ID; long regionSequenceId = WALKey.NO_SEQUENCE_ID;
WALKey key = getKey(); MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); MultiVersionConcurrencyControl.WriteEntry we = null;
boolean preAssigned = (we != null);
if (!preAssigned) { if (mvcc != null) {
MultiVersionConcurrencyControl mvcc = key.getMvcc(); we = mvcc.begin();
if (mvcc != null) {
we = mvcc.begin();
}
}
if (we != null) {
regionSequenceId = we.getWriteNumber(); regionSequenceId = we.getWriteNumber();
} }
@ -130,9 +125,7 @@ class FSWALEntry extends Entry {
CellUtil.setSequenceId(c, regionSequenceId); CellUtil.setSequenceId(c, regionSequenceId);
} }
} }
if (!preAssigned) { getKey().setWriteEntry(we);
key.setWriteEntry(we);
}
return regionSequenceId; return regionSequenceId;
} }

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.SequenceId; import org.apache.hadoop.hbase.regionserver.SequenceId;
// imports for things that haven't moved from regionserver.wal yet. // imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
@ -93,10 +92,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
*/ */
@InterfaceAudience.Private // For internal use only. @InterfaceAudience.Private // For internal use only.
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
if (this.preAssignedWriteEntry != null) {
// don't wait for seqNumAssignedLatch if writeEntry is preassigned
return this.preAssignedWriteEntry;
}
try { try {
this.sequenceIdAssignedLatch.await(); this.sequenceIdAssignedLatch.await();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -208,7 +203,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* Set in a way visible to multiple threads; e.g. synchronized getter/setters. * Set in a way visible to multiple threads; e.g. synchronized getter/setters.
*/ */
private MultiVersionConcurrencyControl.WriteEntry writeEntry; private MultiVersionConcurrencyControl.WriteEntry writeEntry;
private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null;
public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>()); public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
// visible for deprecated HLogKey // visible for deprecated HLogKey
@ -737,24 +731,4 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.origLogSeqNum = walKey.getOrigSequenceNumber(); this.origLogSeqNum = walKey.getOrigSequenceNumber();
} }
} }
/**
* @return The preassigned writeEntry, if any
*/
@InterfaceAudience.Private // For internal use only.
public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() {
return this.preAssignedWriteEntry;
}
/**
* Preassign writeEntry
* @param writeEntry the entry to assign
*/
@InterfaceAudience.Private // For internal use only.
public void setPreAssignedWriteEntry(WriteEntry writeEntry) {
if (writeEntry != null) {
this.preAssignedWriteEntry = writeEntry;
this.sequenceId = writeEntry.getWriteNumber();
}
}
} }

View File

@ -6290,11 +6290,8 @@ public class TestHRegion {
@Override @Override
public Long answer(InvocationOnMock invocation) throws Throwable { public Long answer(InvocationOnMock invocation) throws Throwable {
WALKey key = invocation.getArgumentAt(1, WALKey.class); WALKey key = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
if (we == null) { key.setWriteEntry(we);
we = key.getMvcc().begin();
key.setWriteEntry(we);
}
return 1L; return 1L;
} }