HBASE-25458 HRegion methods cleanup (#2838)

Signed-off-by: meiyi <myimeiyi@gmail.com>
This commit is contained in:
Duo Zhang 2021-01-06 15:13:10 +08:00 committed by GitHub
parent a5eb8f1f70
commit bedb45d4ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 126 additions and 179 deletions

View File

@ -913,17 +913,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
void setHTableSpecificConf() { private void setHTableSpecificConf() {
if (this.htableDescriptor == null) return; if (this.htableDescriptor == null) {
return;
}
long flushSize = this.htableDescriptor.getMemStoreFlushSize(); long flushSize = this.htableDescriptor.getMemStoreFlushSize();
if (flushSize <= 0) { if (flushSize <= 0) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
} }
this.memstoreFlushSize = flushSize; this.memstoreFlushSize = flushSize;
long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
this.blockingMemStoreSize = this.memstoreFlushSize * mult; this.blockingMemStoreSize = this.memstoreFlushSize * mult;
} }
@ -1336,7 +1338,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Increase the size of mem store in this region and the size of global mem * Increase the size of mem store in this region and the size of global mem
* store * store
*/ */
void incMemStoreSize(MemStoreSize mss) { private void incMemStoreSize(MemStoreSize mss) {
incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), incMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
mss.getCellsCount()); mss.getCellsCount());
} }
@ -1356,7 +1358,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
mss.getCellsCount()); mss.getCellsCount());
} }
void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta, private void decrMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
int cellsCountDelta) { int cellsCountDelta) {
if (this.rsAccounting != null) { if (this.rsAccounting != null) {
rsAccounting.decGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta); rsAccounting.decGlobalMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
@ -1987,7 +1989,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( private ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
final String threadNamePrefix) { final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount()); int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());
int maxThreads = Math.min(numStores, int maxThreads = Math.min(numStores,
@ -1996,7 +1998,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
} }
protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool( ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
final String threadNamePrefix) { final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount()); int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());
int maxThreads = Math.max(1, int maxThreads = Math.max(1,
@ -2006,7 +2008,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
} }
static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, private static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
final String threadNamePrefix) { final String threadNamePrefix) {
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactory() { new ThreadFactory() {
@ -2475,11 +2477,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean isCompactionNeeded(); boolean isCompactionNeeded();
} }
public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker, FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
FlushLifeCycleTracker tracker) throws IOException { FlushLifeCycleTracker tracker) throws IOException {
List families = null; List<byte[]> families = null;
if (flushAllStores) { if (flushAllStores) {
families = new ArrayList(); families = new ArrayList<>();
families.addAll(this.getTableDescriptor().getColumnFamilyNames()); families.addAll(this.getTableDescriptor().getColumnFamilyNames());
} }
return this.flushcache(families, writeFlushRequestWalMarker, tracker); return this.flushcache(families, writeFlushRequestWalMarker, tracker);
@ -2960,7 +2962,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Intentional; notify is about completed flush") justification="Intentional; notify is about completed flush")
protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status, FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,
PrepareFlushResult prepareResult, Collection<HStore> storesToFlush) throws IOException { PrepareFlushResult prepareResult, Collection<HStore> storesToFlush) throws IOException {
// prepare flush context is carried via PrepareFlushResult // prepare flush context is carried via PrepareFlushResult
TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs; TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
@ -3157,12 +3159,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
return instantiateRegionScanner(scan, additionalScanners, HConstants.NO_NONCE,
HConstants.NO_NONCE);
}
protected RegionScannerImpl instantiateRegionScanner(Scan scan, protected RegionScannerImpl instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
if (scan.isReversed()) { if (scan.isReversed()) {
@ -3177,9 +3173,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Prepare a delete for a row mutation processor * Prepare a delete for a row mutation processor
* @param delete The passed delete is modified by this method. WARNING! * @param delete The passed delete is modified by this method. WARNING!
* @throws IOException
*/ */
public void prepareDelete(Delete delete) throws IOException { private void prepareDelete(Delete delete) throws IOException {
// Check to see if this is a deleteRow insert // Check to see if this is a deleteRow insert
if(delete.getFamilyCellMap().isEmpty()){ if(delete.getFamilyCellMap().isEmpty()){
for(byte [] family : this.htableDescriptor.getColumnFamilyNames()){ for(byte [] family : this.htableDescriptor.getColumnFamilyNames()){
@ -3203,38 +3198,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.DELETE); startRegionOperation(Operation.DELETE);
try { try {
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
doBatchMutate(delete); mutate(delete);
} finally { } finally {
closeRegionOperation(Operation.DELETE); closeRegionOperation(Operation.DELETE);
} }
} }
/**
* Row needed by below method.
*/
private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
/**
* This is used only by unit tests. Not required to be a public API.
* @param familyMap map of family to edits for the given family.
* @throws IOException
*/
void delete(NavigableMap<byte[], List<Cell>> familyMap,
Durability durability) throws IOException {
Delete delete = new Delete(FOR_UNIT_TESTS_ONLY, HConstants.LATEST_TIMESTAMP, familyMap);
delete.setDurability(durability);
doBatchMutate(delete);
}
/** /**
* Set up correct timestamps in the KVs in Delete object. * Set up correct timestamps in the KVs in Delete object.
* <p>Caller should have the row and region locks. * <p/>
* @param mutation * Caller should have the row and region locks.
* @param familyMap
* @param byteNow
* @throws IOException
*/ */
public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap, private void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
byte[] byteNow) throws IOException { byte[] byteNow) throws IOException {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@ -3278,7 +3253,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, byte[] byteNow) private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, byte[] byteNow)
throws IOException { throws IOException {
List<Cell> result = get(get, false); List<Cell> result = get(get, false);
@ -3306,7 +3281,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.PUT); startRegionOperation(Operation.PUT);
try { try {
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
doBatchMutate(put); mutate(put);
} finally { } finally {
closeRegionOperation(Operation.PUT); closeRegionOperation(Operation.PUT);
} }
@ -3353,7 +3328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Visitor interface for batch operations * Visitor interface for batch operations
*/ */
@FunctionalInterface @FunctionalInterface
public interface Visitor { interface Visitor {
/** /**
* @param index operation index * @param index operation index
* @return If true continue visiting remaining entries, break otherwise * @return If true continue visiting remaining entries, break otherwise
@ -3759,14 +3734,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most of
* of the logic is same. * the logic is same.
*/ */
static class MutationBatchOperation extends BatchOperation<Mutation> { private static class MutationBatchOperation extends BatchOperation<Mutation> {
private long nonceGroup; private long nonceGroup;
private long nonce; private long nonce;
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic, public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
long nonceGroup, long nonce) { long nonceGroup, long nonce) {
super(region, operations); super(region, operations);
this.atomic = atomic; this.atomic = atomic;
this.nonceGroup = nonceGroup; this.nonceGroup = nonceGroup;
@ -4401,10 +4379,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
* of the logic is same. * of the logic is same.
*/ */
static class ReplayBatchOperation extends BatchOperation<MutationReplay> { private static final class ReplayBatchOperation extends BatchOperation<MutationReplay> {
private long origLogSeqNum = 0; private long origLogSeqNum = 0;
public ReplayBatchOperation(final HRegion region, MutationReplay[] operations, public ReplayBatchOperation(final HRegion region, MutationReplay[] operations,
long origLogSeqNum) { long origLogSeqNum) {
super(region, operations); super(region, operations);
this.origLogSeqNum = origLogSeqNum; this.origLogSeqNum = origLogSeqNum;
} }
@ -4512,12 +4492,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup, private OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
long nonce) throws IOException { long nonce) throws IOException {
// As it stands, this is used for 3 things // As it stands, this is used for 3 things
// * batchMutate with single mutation - put/delete/increment/append, separate or from // * batchMutate with single mutation - put/delete/increment/append, separate or from
// checkAndMutate. // checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint). // * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce)); return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce));
} }
@ -4525,8 +4505,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override @Override
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
// If the mutations has any Increment/Append operations, we need to do batchMutate atomically // If the mutations has any Increment/Append operations, we need to do batchMutate atomically
boolean atomic = Arrays.stream(mutations) boolean atomic =
.anyMatch(m -> m instanceof Increment || m instanceof Append); Arrays.stream(mutations).anyMatch(m -> m instanceof Increment || m instanceof Append);
return batchMutate(mutations, atomic);
}
OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws IOException {
return batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE); return batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE);
} }
@ -4556,24 +4540,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Perform a batch of mutations. * Perform a batch of mutations.
* * <p/>
* Operations in a batch are stored with highest durability specified of for all operations in a * Operations in a batch are stored with highest durability specified of for all operations in a
* batch, except for {@link Durability#SKIP_WAL}. * batch, except for {@link Durability#SKIP_WAL}.
* * <p/>
* <p>This function is called from {@link #batchReplay(WALSplitUtil.MutationReplay[], long)} with * This function is called from {@link #batchReplay(WALSplitUtil.MutationReplay[], long)} with
* {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[])} with * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[])} with
* {@link MutationBatchOperation} instance as an argument. As the processing of replay batch * {@link MutationBatchOperation} instance as an argument. As the processing of replay batch and
* and mutation batch is very similar, lot of code is shared by providing generic methods in * mutation batch is very similar, lot of code is shared by providing generic methods in base
* base class {@link BatchOperation}. The logic for this method and * class {@link BatchOperation}. The logic for this method and
* {@link #doMiniBatchMutate(BatchOperation)} is implemented using methods in base class which * {@link #doMiniBatchMutate(BatchOperation)} is implemented using methods in base class which are
* are overridden by derived classes to implement special behavior. * overridden by derived classes to implement special behavior.
*
* @param batchOp contains the list of mutations * @param batchOp contains the list of mutations
* @return an array of OperationStatus which internally contains the * @return an array of OperationStatus which internally contains the OperationStatusCode and the
* OperationStatusCode and the exceptionMessage if any. * exceptionMessage if any.
* @throws IOException if an IO problem is encountered * @throws IOException if an IO problem is encountered
*/ */
OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException { private OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {
boolean initialized = false; boolean initialized = false;
batchOp.startRegionOperation(); batchOp.startRegionOperation();
try { try {
@ -4727,7 +4710,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Returns effective durability from the passed durability and * Returns effective durability from the passed durability and
* the table descriptor. * the table descriptor.
*/ */
protected Durability getEffectiveDurability(Durability d) { private Durability getEffectiveDurability(Durability d) {
return d == Durability.USE_DEFAULT ? this.regionDurability : d; return d == Durability.USE_DEFAULT ? this.regionDurability : d;
} }
@ -4916,7 +4899,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
Result r; Result r;
if (mutation != null) { if (mutation != null) {
r = doBatchMutate(mutation, true).getResult(); r = mutate(mutation, true).getResult();
} else { } else {
r = mutateRow(rowMutations); r = mutateRow(rowMutations);
} }
@ -4976,27 +4959,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return matches; return matches;
} }
private OperationStatus doBatchMutate(Mutation mutation) throws IOException { private OperationStatus mutate(Mutation mutation) throws IOException {
return doBatchMutate(mutation, false); return mutate(mutation, false);
} }
private OperationStatus doBatchMutate(Mutation mutation, boolean atomic) throws IOException { private OperationStatus mutate(Mutation mutation, boolean atomic) throws IOException {
return doBatchMutate(mutation, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE); return mutate(mutation, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE);
} }
private OperationStatus doBatchMutate(Mutation mutation, boolean atomic, long nonceGroup, private OperationStatus mutate(Mutation mutation, boolean atomic, long nonceGroup, long nonce)
long nonce) throws IOException { throws IOException {
OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}, atomic, OperationStatus[] status =
nonceGroup, nonce); this.batchMutate(new Mutation[] { mutation }, atomic, nonceGroup, nonce);
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { if (status[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); throw new FailedSanityCheckException(status[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { } else if (status[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); throw new NoSuchColumnFamilyException(status[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals( } else if (status[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
OperationStatusCode.STORE_TOO_BUSY)) { throw new RegionTooBusyException(status[0].getExceptionMsg());
throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
} }
return batchMutate[0]; return status[0];
} }
/** /**
@ -5055,7 +5037,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Possibly rewrite incoming cell tags. * Possibly rewrite incoming cell tags.
*/ */
void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) { private void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
// Check if we have any work to do and early out otherwise // Check if we have any work to do and early out otherwise
// Update these checks as more logic is added here // Update these checks as more logic is added here
if (m.getTTL() == Long.MAX_VALUE) { if (m.getTTL() == Long.MAX_VALUE) {
@ -5077,15 +5059,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
/* /**
* Check if resources to support an update. * Check if resources to support an update.
* * <p/>
* We throw RegionTooBusyException if above memstore limit * We throw RegionTooBusyException if above memstore limit and expect client to retry using some
* and expect client to retry using some kind of backoff * kind of backoff
*/ */
void checkResources() throws RegionTooBusyException { private void checkResources() throws RegionTooBusyException {
// If catalog region, do not impose resource constraints or block updates. // If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return; if (this.getRegionInfo().isMetaRegion()) {
return;
}
MemStoreSize mss = this.memStoreSizing.getMemStoreSize(); MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) { if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) {
@ -5110,13 +5094,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* @throws IOException Throws exception if region is in read-only mode. * @throws IOException Throws exception if region is in read-only mode.
*/ */
protected void checkReadOnly() throws IOException { private void checkReadOnly() throws IOException {
if (isReadOnly()) { if (isReadOnly()) {
throw new DoNotRetryIOException("region is read only"); throw new DoNotRetryIOException("region is read only");
} }
} }
protected void checkReadsEnabled() throws IOException { private void checkReadsEnabled() throws IOException {
if (!this.writestate.readsEnabled) { if (!this.writestate.readsEnabled) {
throw new IOException(getRegionInfo().getEncodedName() throw new IOException(getRegionInfo().getEncodedName()
+ ": The region's reads are disabled. Cannot serve the request"); + ": The region's reads are disabled. Cannot serve the request");
@ -5130,21 +5114,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.writestate.setReadsEnabled(readsEnabled); this.writestate.setReadsEnabled(readsEnabled);
} }
/**
* Add updates first to the wal and then add values to memstore.
* <p>
* Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column
*/
void put(final byte[] row, byte[] family, List<Cell> edits) throws IOException {
NavigableMap<byte[], List<Cell>> familyMap;
familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
familyMap.put(family, edits);
Put p = new Put(row, HConstants.LATEST_TIMESTAMP, familyMap);
doBatchMutate(p);
}
/** /**
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
* set; when set we will run operations that make sense in the increment/append scenario * set; when set we will run operations that make sense in the increment/append scenario
@ -5194,7 +5163,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
void checkFamily(final byte[] family) throws NoSuchColumnFamilyException { private void checkFamily(final byte[] family) throws NoSuchColumnFamilyException {
if (!this.htableDescriptor.hasColumnFamily(family)) { if (!this.htableDescriptor.hasColumnFamily(family)) {
throw new NoSuchColumnFamilyException( throw new NoSuchColumnFamilyException(
"Column family " + Bytes.toString(family) + " does not exist in region " + this "Column family " + Bytes.toString(family) + " does not exist in region " + this
@ -6055,7 +6024,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Currently, this method is used to drop memstore to prevent memory leak * Currently, this method is used to drop memstore to prevent memory leak
* when replaying recovered.edits while opening region. * when replaying recovered.edits while opening region.
*/ */
public MemStoreSize dropMemStoreContents() throws IOException { private MemStoreSize dropMemStoreContents() throws IOException {
MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing(); MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
this.updatesLock.writeLock().lock(); this.updatesLock.writeLock().lock();
try { try {
@ -8106,11 +8075,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Open HRegion. * Open HRegion.
* <p/>
* Calls initialize and sets sequenceId. * Calls initialize and sets sequenceId.
* @return Returns <code>this</code> * @return Returns <code>this</code>
*/ */
protected HRegion openHRegion(final CancelableProgressable reporter) private HRegion openHRegion(final CancelableProgressable reporter) throws IOException {
throws IOException {
try { try {
// Refuse to open the region if we are missing local compression support // Refuse to open the region if we are missing local compression support
TableDescriptorChecker.checkCompression(htableDescriptor); TableDescriptorChecker.checkCompression(htableDescriptor);
@ -8255,7 +8224,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE); return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
} }
public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce) private List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
throws IOException { throws IOException {
List<Cell> results = new ArrayList<>(); List<Cell> results = new ArrayList<>();
long before = EnvironmentEdgeManager.currentTime(); long before = EnvironmentEdgeManager.currentTime();
@ -8619,7 +8588,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.APPEND); startRegionOperation(Operation.APPEND);
try { try {
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
return doBatchMutate(append, true, nonceGroup, nonce).getResult(); return mutate(append, true, nonceGroup, nonce).getResult();
} finally { } finally {
closeRegionOperation(Operation.APPEND); closeRegionOperation(Operation.APPEND);
} }
@ -8636,7 +8605,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.INCREMENT); startRegionOperation(Operation.INCREMENT);
try { try {
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
return doBatchMutate(increment, true, nonceGroup, nonce).getResult(); return mutate(increment, true, nonceGroup, nonce).getResult();
} finally { } finally {
closeRegionOperation(Operation.INCREMENT); closeRegionOperation(Operation.INCREMENT);
} }
@ -9176,15 +9145,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
flushesQueued.increment(); flushesQueued.increment();
} }
public long getReadPoint() {
return getReadPoint(IsolationLevel.READ_COMMITTED);
}
/** /**
* If a handler thread is eligible for interrupt, make it ineligible. Should be paired * If a handler thread is eligible for interrupt, make it ineligible. Should be paired
* with {{@link #enableInterrupts()}. * with {{@link #enableInterrupts()}.
*/ */
protected void disableInterrupts() { void disableInterrupts() {
regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> false); regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> false);
} }
@ -9192,7 +9157,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* If a handler thread was made ineligible for interrupt via {{@link #disableInterrupts()}, * If a handler thread was made ineligible for interrupt via {{@link #disableInterrupts()},
* make it eligible again. No-op if interrupts are already enabled. * make it eligible again. No-op if interrupts are already enabled.
*/ */
protected void enableInterrupts() { void enableInterrupts() {
regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> true); regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> true);
} }
@ -9364,7 +9329,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* features * features
* @param conf region configurations * @param conf region configurations
*/ */
static void decorateRegionConfiguration(Configuration conf) { private static void decorateRegionConfiguration(Configuration conf) {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
String plugins = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,""); String plugins = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName(); String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();

View File

@ -1003,8 +1003,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
} }
OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, OperationStatus[] codes = region.batchMutate(mArray, atomic);
HConstants.NO_NONCE);
// When atomic is true, it indicates that the mutateRow API or the batch API with // When atomic is true, it indicates that the mutateRow API or the batch API with
// RowMutations is called. In this case, we need to merge the results of the // RowMutations is called. In this case, we need to merge the results of the

View File

@ -710,8 +710,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
mss = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
// simulate flusher // simulate flusher
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), region.decrMemStoreSize(mss);
mss.getCellsCount());
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(7, s.getCellsCount()); assertEquals(7, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());
@ -788,8 +787,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
mss = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
// simulate flusher // simulate flusher
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), region.decrMemStoreSize(mss);
mss.getCellsCount());
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount()); assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());

View File

@ -282,8 +282,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
mss = memstore.getFlushableSize(); mss = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
// simulate flusher // simulate flusher
region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), region.decrMemStoreSize(mss);
mss.getCellsCount());
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount()); assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize()); assertEquals(0, regionServicesForStores.getMemStoreSize());

View File

@ -26,7 +26,9 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.NavigableMap;
import java.util.Objects; import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -971,28 +973,23 @@ public class TestDefaultMemStore {
} }
/** /**
* Inserts a new region's meta information into the passed * Inserts a new region's meta information into the passed <code>meta</code> region.
* <code>meta</code> region. Used by the HMaster bootstrap code adding
* new table to hbase:meta table.
*
* @param meta hbase:meta HRegion to be updated * @param meta hbase:meta HRegion to be updated
* @param r HRegion to add to <code>meta</code> * @param r HRegion to add to <code>meta</code>
*
* @throws IOException
*/ */
public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException { private static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
meta.checkResources();
// The row key is the region name // The row key is the region name
byte[] row = r.getRegionInfo().getRegionName(); byte[] row = r.getRegionInfo().getRegionName();
final long now = EnvironmentEdgeManager.currentTime(); final long now = EnvironmentEdgeManager.currentTime();
final List<Cell> cells = new ArrayList<>(2); final List<Cell> cells = new ArrayList<>(2);
cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY, cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, now,
HConstants.REGIONINFO_QUALIFIER, now, RegionInfo.toByteArray(r.getRegionInfo()))); RegionInfo.toByteArray(r.getRegionInfo())));
// Set into the root table the version of the meta table. // Set into the root table the version of the meta table.
cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY, cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY, HConstants.META_VERSION_QUALIFIER, now,
HConstants.META_VERSION_QUALIFIER, now,
Bytes.toBytes(HConstants.META_VERSION))); Bytes.toBytes(HConstants.META_VERSION)));
meta.put(row, HConstants.CATALOG_FAMILY, cells); NavigableMap<byte[], List<Cell>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
familyMap.put(HConstants.CATALOG_FAMILY, cells);
meta.put(new Put(row, HConstants.LATEST_TIMESTAMP, familyMap));
} }
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -43,7 +44,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -137,7 +137,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.Region.RowLock;
@ -1679,9 +1678,7 @@ public class TestHRegion {
long syncs = prepareRegionForBachPut(puts, source, false); long syncs = prepareRegionForBachPut(puts, source, false);
// 1. Straight forward case, should succeed // 1. Straight forward case, should succeed
MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, OperationStatus[] codes = this.region.batchMutate(puts, true);
HConstants.NO_NONCE, HConstants.NO_NONCE);
OperationStatus[] codes = this.region.batchMutate(batchOp);
assertEquals(10, codes.length); assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
@ -1695,15 +1692,11 @@ public class TestHRegion {
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
final AtomicReference<IOException> retFromThread = new AtomicReference<>(); final AtomicReference<IOException> retFromThread = new AtomicReference<>();
final CountDownLatch finishedPuts = new CountDownLatch(1); final CountDownLatch finishedPuts = new CountDownLatch(1);
final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
HConstants
.NO_NONCE,
HConstants.NO_NONCE);
TestThread putter = new TestThread(ctx) { TestThread putter = new TestThread(ctx) {
@Override @Override
public void doWork() throws IOException { public void doWork() throws IOException {
try { try {
region.batchMutate(finalBatchOp); region.batchMutate(puts, true);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("test failed!", ioe); LOG.error("test failed!", ioe);
retFromThread.set(ioe); retFromThread.set(ioe);
@ -1730,10 +1723,8 @@ public class TestHRegion {
// 3. Exception thrown in validation // 3. Exception thrown in validation
LOG.info("Next a batch put with one invalid family"); LOG.info("Next a batch put with one invalid family");
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
HConstants.NO_NONCE);
thrown.expect(NoSuchColumnFamilyException.class); thrown.expect(NoSuchColumnFamilyException.class);
this.region.batchMutate(batchOp); this.region.batchMutate(puts, true);
} }
@Test @Test
@ -3172,23 +3163,19 @@ public class TestHRegion {
List<Cell> kvs = new ArrayList<>(); List<Cell> kvs = new ArrayList<>();
kvs.add(new KeyValue(row1, fam4, null, null)); kvs.add(new KeyValue(row1, fam4, null, null));
byte[] forUnitTestsOnly = Bytes.toBytes("ForUnitTestsOnly");
// testing existing family // testing existing family
byte[] family = fam2;
NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
deleteMap.put(family, kvs); deleteMap.put(fam2, kvs);
region.delete(deleteMap, Durability.SYNC_WAL); region.delete(new Delete(forUnitTestsOnly, HConstants.LATEST_TIMESTAMP, deleteMap));
// testing non existing family // testing non existing family
boolean ok = false; NavigableMap<byte[], List<Cell>> deleteMap2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
family = fam4; deleteMap2.put(fam4, kvs);
try { assertThrows("Family " + Bytes.toString(fam4) + " does exist",
deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); NoSuchColumnFamilyException.class,
deleteMap.put(family, kvs); () -> region.delete(new Delete(forUnitTestsOnly, HConstants.LATEST_TIMESTAMP, deleteMap2)));
region.delete(deleteMap, Durability.SYNC_WAL);
} catch (Exception e) {
ok = true;
}
assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
} }
@Test @Test
@ -3549,6 +3536,8 @@ public class TestHRegion {
byte[] col2 = Bytes.toBytes("col2"); byte[] col2 = Bytes.toBytes("col2");
byte[] col3 = Bytes.toBytes("col3"); byte[] col3 = Bytes.toBytes("col3");
byte[] forUnitTestsOnly = Bytes.toBytes("ForUnitTestsOnly");
// Setting up region // Setting up region
this.region = initHRegion(tableName, method, CONF, fam1); this.region = initHRegion(tableName, method, CONF, fam1);
// Building checkerList // Building checkerList
@ -3559,12 +3548,12 @@ public class TestHRegion {
NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
deleteMap.put(fam1, kvs); deleteMap.put(fam1, kvs);
region.delete(deleteMap, Durability.SYNC_WAL); region.delete(new Delete(forUnitTestsOnly, HConstants.LATEST_TIMESTAMP, deleteMap));
// extract the key values out the memstore: // extract the key values out the memstore:
// This is kinda hacky, but better than nothing... // This is kinda hacky, but better than nothing...
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore; AbstractMemStore memstore = (AbstractMemStore) region.getStore(fam1).memstore;
Cell firstCell = memstore.getActive().first(); Cell firstCell = memstore.getActive().first();
assertTrue(firstCell.getTimestamp() <= now); assertTrue(firstCell.getTimestamp() <= now);
now = firstCell.getTimestamp(); now = firstCell.getTimestamp();