HBASE-6698 Refactor checkAndPut and checkAndDelete to use doMiniBatchMutation
(Priya) Submitted by:PrIya Reviewed by:Ram, Stack, Ted, Lars git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1388141 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
70639863eb
commit
55b87a3c96
|
@ -1783,8 +1783,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
prepareDelete(delete);
|
||||
internalDelete(delete, delete.getClusterId(), writeToWAL);
|
||||
doBatchMutate(delete, lid);
|
||||
} finally {
|
||||
if(lockid == null) releaseRowLock(lid);
|
||||
}
|
||||
|
@ -1801,11 +1800,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
|
||||
boolean writeToWAL) throws IOException {
|
||||
Delete delete = new Delete();
|
||||
Delete delete = new Delete(HConstants.EMPTY_BYTE_ARRAY);
|
||||
delete.setFamilyMap(familyMap);
|
||||
delete.setClusterId(clusterId);
|
||||
delete.setWriteToWAL(writeToWAL);
|
||||
internalDelete(delete, clusterId, writeToWAL);
|
||||
doBatchMutate(delete, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1862,65 +1861,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param delete The Delete command
|
||||
* @param clusterId UUID of the originating cluster (for replication).
|
||||
* @param writeToWAL
|
||||
* @throws IOException
|
||||
*/
|
||||
private void internalDelete(Delete delete, UUID clusterId,
|
||||
boolean writeToWAL) throws IOException {
|
||||
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
|
||||
WALEdit walEdit = new WALEdit();
|
||||
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
||||
if (coprocessorHost != null) {
|
||||
if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
byte [] byteNow = Bytes.toBytes(now);
|
||||
boolean flush = false;
|
||||
|
||||
updatesLock.readLock().lock();
|
||||
try {
|
||||
prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
|
||||
|
||||
if (writeToWAL) {
|
||||
// write/sync to WAL should happen before we touch memstore.
|
||||
//
|
||||
// If order is reversed, i.e. we write to memstore first, and
|
||||
// for some reason fail to write/sync to commit log, the memstore
|
||||
// will contain uncommitted transactions.
|
||||
//
|
||||
// bunch up all edits across all column families into a
|
||||
// single WALEdit.
|
||||
addFamilyMapToWALEdit(familyMap, walEdit);
|
||||
this.log.append(regionInfo, this.htableDescriptor.getName(),
|
||||
walEdit, clusterId, now, this.htableDescriptor);
|
||||
}
|
||||
|
||||
// Now make changes to the memstore.
|
||||
long addedSize = applyFamilyMapToMemstore(familyMap, null);
|
||||
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
|
||||
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
// do after lock
|
||||
if (coprocessorHost != null) {
|
||||
coprocessorHost.postDelete(delete, walEdit, writeToWAL);
|
||||
}
|
||||
final long after = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now);
|
||||
|
||||
if (flush) {
|
||||
// Request a cache flush. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param put
|
||||
* @throws IOException
|
||||
|
@ -1978,7 +1918,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
try {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
internalPut(put, put.getClusterId(), writeToWAL);
|
||||
doBatchMutate(put, lid);
|
||||
} finally {
|
||||
if(lockid == null) releaseRowLock(lid);
|
||||
}
|
||||
|
@ -2444,7 +2384,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
startRegionOperation();
|
||||
this.writeRequestsCount.increment();
|
||||
try {
|
||||
RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
|
||||
Get get = new Get(row, lock);
|
||||
|
@ -2496,17 +2435,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (matches) {
|
||||
// All edits for the given row (across all column families) must
|
||||
// happen atomically.
|
||||
//
|
||||
// Using default cluster id, as this can only happen in the
|
||||
// originating cluster. A slave cluster receives the result as a Put
|
||||
// or Delete
|
||||
if (isPut) {
|
||||
internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
|
||||
} else {
|
||||
Delete d = (Delete)w;
|
||||
prepareDelete(d);
|
||||
internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
|
||||
}
|
||||
doBatchMutate((Mutation)w, lid);
|
||||
this.checkAndMutateChecksPassed.increment();
|
||||
return true;
|
||||
}
|
||||
|
@ -2520,6 +2449,18 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
|
||||
DoNotRetryIOException {
|
||||
Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] { new Pair<Mutation, Integer>(mutation,
|
||||
lid) };
|
||||
OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
|
||||
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
|
||||
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
|
||||
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
|
||||
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
|
||||
|
@ -2592,7 +2533,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @praram now
|
||||
* @throws IOException
|
||||
*/
|
||||
private void put(byte [] family, List<KeyValue> edits)
|
||||
private void put(byte [] family, List<KeyValue> edits, Integer lid)
|
||||
throws IOException {
|
||||
Map<byte[], List<KeyValue>> familyMap;
|
||||
familyMap = new HashMap<byte[], List<KeyValue>>();
|
||||
|
@ -2602,70 +2543,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
p.setFamilyMap(familyMap);
|
||||
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
|
||||
p.setWriteToWAL(true);
|
||||
this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
|
||||
doBatchMutate(p, lid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add updates first to the hlog (if writeToWal) and then add values to memstore.
|
||||
* Warning: Assumption is caller has lock on passed in row.
|
||||
* @param put The Put command
|
||||
* @param clusterId UUID of the originating cluster (for replication).
|
||||
* @param writeToWAL if true, then we should write to the log
|
||||
* @throws IOException
|
||||
*/
|
||||
private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException {
|
||||
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
|
||||
WALEdit walEdit = new WALEdit();
|
||||
/* run pre put hook outside of lock to avoid deadlock */
|
||||
if (coprocessorHost != null) {
|
||||
if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
byte[] byteNow = Bytes.toBytes(now);
|
||||
boolean flush = false;
|
||||
|
||||
this.updatesLock.readLock().lock();
|
||||
try {
|
||||
checkFamilies(familyMap.keySet());
|
||||
checkTimestamps(familyMap, now);
|
||||
updateKVTimestamps(familyMap.values(), byteNow);
|
||||
// write/sync to WAL should happen before we touch memstore.
|
||||
//
|
||||
// If order is reversed, i.e. we write to memstore first, and
|
||||
// for some reason fail to write/sync to commit log, the memstore
|
||||
// will contain uncommitted transactions.
|
||||
if (writeToWAL) {
|
||||
addFamilyMapToWALEdit(familyMap, walEdit);
|
||||
this.log.append(regionInfo, this.htableDescriptor.getName(),
|
||||
walEdit, clusterId, now, this.htableDescriptor);
|
||||
} else {
|
||||
recordPutWithoutWal(familyMap);
|
||||
}
|
||||
|
||||
long addedSize = applyFamilyMapToMemstore(familyMap, null);
|
||||
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (coprocessorHost != null) {
|
||||
coprocessorHost.postPut(put, walEdit, writeToWAL);
|
||||
}
|
||||
|
||||
// do after lock
|
||||
final long after = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
|
||||
|
||||
|
||||
if (flush) {
|
||||
// Request a cache flush. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Atomically apply the given map of family->edits to the memstore.
|
||||
* This handles the consistency control on its own, but the caller
|
||||
|
@ -4019,7 +3899,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
|
||||
HConstants.META_VERSION_QUALIFIER, now,
|
||||
Bytes.toBytes(HConstants.META_VERSION)));
|
||||
meta.put(HConstants.CATALOG_FAMILY, edits);
|
||||
meta.put(HConstants.CATALOG_FAMILY, edits, lid);
|
||||
} finally {
|
||||
meta.releaseRowLock(lid);
|
||||
}
|
||||
|
|
|
@ -189,10 +189,10 @@ public class TestRegionServerMetrics {
|
|||
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, regionName, "append_");
|
||||
|
||||
// One delete where the cf is known
|
||||
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "delete_");
|
||||
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "multidelete_");
|
||||
|
||||
// two deletes in the region.
|
||||
assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "delete_");
|
||||
assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "multidelete_");
|
||||
|
||||
// Three gets. one for gets. One for append. One for increment.
|
||||
assertTimeVaryingMetricCount(3, TABLE_NAME, cf, regionName, "get_");
|
||||
|
|
Loading…
Reference in New Issue