HBASE-5203 Group atomic put/delete operation into a single WALEdit to handle region server failures. (Lars H)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1232551 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-01-17 19:38:13 +00:00
parent 25dc6b2530
commit 01246cdc99
5 changed files with 264 additions and 232 deletions

View File

@ -137,6 +137,35 @@ public class Delete extends Mutation
familyMap.put(kv.getFamily(), list);
}
/**
* Advanced use only.
* Add an existing delete marker to this Delete object.
* @param kv An existing KeyValue of type "delete".
* @return this for invocation chaining
* @throws IOException
*/
public Delete addDeleteMarker(KeyValue kv) throws IOException {
if (!kv.isDelete()) {
throw new IOException("The recently added KeyValue is not of type "
+ "delete. Rowkey: " + Bytes.toStringBinary(this.row));
}
if (Bytes.compareTo(this.row, 0, row.length, kv.getBuffer(),
kv.getRowOffset(), kv.getRowLength()) != 0) {
throw new IOException("The row in the recently added KeyValue "
+ Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(),
kv.getRowLength()) + " doesn't match the original one "
+ Bytes.toStringBinary(this.row));
}
byte [] family = kv.getFamily();
List<KeyValue> list = familyMap.get(family);
if (list == null) {
list = new ArrayList<KeyValue>();
}
list.add(kv);
familyMap.put(family, list);
return this;
}
/**
* Delete all versions of all columns of the specified family.
* <p>

View File

@ -1686,7 +1686,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// All edits for the given row (across all column families) must happen atomically.
prepareDelete(delete);
internalDelete(delete, delete.getClusterId(), writeToWAL, null, null);
internalDelete(delete, delete.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@ -1707,26 +1707,77 @@ public class HRegion implements HeapSize { // , Writable{
delete.setFamilyMap(familyMap);
delete.setClusterId(clusterId);
delete.setWriteToWAL(writeToWAL);
internalDelete(delete, clusterId, writeToWAL, null, null);
internalDelete(delete, clusterId, writeToWAL);
}
/**
* Setup a Delete object with correct timestamps.
* Caller should the row and region locks.
* @param delete
* @param now
* @throws IOException
*/
private void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
throws IOException {
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<KeyValue> kvs = e.getValue();
Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (KeyValue kv: kvs) {
// Check if time is LATEST, change to time of most recent addition if so
// This is expensive.
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
Integer count = kvCount.get(qual);
if (count == null) {
kvCount.put(qual, 1);
} else {
kvCount.put(qual, count + 1);
}
count = kvCount.get(qual);
Get get = new Get(kv.getRow());
get.setMaxVersions(count);
get.addColumn(family, qual);
List<KeyValue> result = get(get, false);
if (result.size() < count) {
// Nothing to delete
kv.updateLatestStamp(byteNow);
continue;
}
if (result.size() > count) {
throw new RuntimeException("Unexpected size: " + result.size());
}
KeyValue getkv = result.get(count - 1);
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
} else {
kv.updateLatestStamp(byteNow);
}
}
}
}
/**
* @param delete The Delete command
* @param familyMap map of family to edits for the given family.
* @param clusterId UUID of the originating cluster (for replication).
* @param writeToWAL
* @param writeEntry Optional mvcc write point to use
* @param walEdit Optional walEdit to use. A non-null walEdit indicates
* that the coprocessor hooks are run by the caller
* @throws IOException
*/
private void internalDelete(Delete delete, UUID clusterId,
boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry,
WALEdit walEdit) throws IOException {
boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
WALEdit walEdit = new WALEdit();
/* Run coprocessor pre hook outside of locks to avoid deadlock */
if (coprocessorHost != null && walEdit == null) {
if (coprocessorHost.preDelete(delete, localWalEdit, writeToWAL)) {
if (coprocessorHost != null) {
if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
return;
}
}
@ -1737,49 +1788,7 @@ public class HRegion implements HeapSize { // , Writable{
updatesLock.readLock().lock();
try {
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<KeyValue> kvs = e.getValue();
Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (KeyValue kv: kvs) {
// Check if time is LATEST, change to time of most recent addition if so
// This is expensive.
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
Integer count = kvCount.get(qual);
if (count == null) {
kvCount.put(qual, 1);
} else {
kvCount.put(qual, count + 1);
}
count = kvCount.get(qual);
Get get = new Get(kv.getRow());
get.setMaxVersions(count);
get.addColumn(family, qual);
List<KeyValue> result = get(get, false);
if (result.size() < count) {
// Nothing to delete
kv.updateLatestStamp(byteNow);
continue;
}
if (result.size() > count) {
throw new RuntimeException("Unexpected size: " + result.size());
}
KeyValue getkv = result.get(count - 1);
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
} else {
kv.updateLatestStamp(byteNow);
}
}
}
prepareDeleteTimestamps(delete, byteNow);
if (writeToWAL) {
// write/sync to WAL should happen before we touch memstore.
@ -1790,21 +1799,21 @@ public class HRegion implements HeapSize { // , Writable{
//
// bunch up all edits across all column families into a
// single WALEdit.
addFamilyMapToWALEdit(familyMap, localWalEdit);
addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(),
localWalEdit, clusterId, now, this.htableDescriptor);
walEdit, clusterId, now, this.htableDescriptor);
}
// Now make changes to the memstore.
long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
long addedSize = applyFamilyMapToMemstore(familyMap, null);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally {
this.updatesLock.readLock().unlock();
}
// do after lock
if (coprocessorHost != null && walEdit == null) {
coprocessorHost.postDelete(delete, localWalEdit, writeToWAL);
if (coprocessorHost != null) {
coprocessorHost.postDelete(delete, walEdit, writeToWAL);
}
final long after = EnvironmentEdgeManager.currentTimeMillis();
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
@ -1876,7 +1885,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// All edits for the given row (across all column families) must happen atomically.
internalPut(put, put.getClusterId(), writeToWAL, null, null);
internalPut(put, put.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@ -2305,13 +2314,11 @@ public class HRegion implements HeapSize { // , Writable{
// originating cluster. A slave cluster receives the result as a Put
// or Delete
if (isPut) {
internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL,
null, null);
internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
} else {
Delete d = (Delete)w;
prepareDelete(d);
internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null,
null);
internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
}
return true;
}
@ -2406,26 +2413,23 @@ public class HRegion implements HeapSize { // , Writable{
p.setFamilyMap(familyMap);
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
p.setWriteToWAL(true);
this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null, null);
this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
}
/**
* Add updates first to the hlog (if writeToWal) and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
* @param put The Put command
* @param clusterId UUID of the originating cluster (for replication).
* @param writeToWAL if true, then we should write to the log
* @param writeEntry Optional mvcc write point to use
* @param walEdit Optional walEdit to use. A non-null walEdit indicates
* that the coprocessor hooks are run by the caller
* @throws IOException
*/
private void internalPut(Put put, UUID clusterId, boolean writeToWAL,
MultiVersionConsistencyControl.WriteEntry writeEntry, WALEdit walEdit) throws IOException {
private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
WALEdit walEdit = new WALEdit();
/* run pre put hook outside of lock to avoid deadlock */
if (coprocessorHost != null && walEdit == null) {
if (coprocessorHost.prePut(put, localWalEdit, writeToWAL)) {
if (coprocessorHost != null) {
if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
return;
}
}
@ -2445,19 +2449,19 @@ public class HRegion implements HeapSize { // , Writable{
// for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions.
if (writeToWAL) {
addFamilyMapToWALEdit(familyMap, localWalEdit);
addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(),
localWalEdit, clusterId, now, this.htableDescriptor);
walEdit, clusterId, now, this.htableDescriptor);
}
long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
long addedSize = applyFamilyMapToMemstore(familyMap, null);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally {
this.updatesLock.readLock().unlock();
}
if (coprocessorHost != null && walEdit == null) {
coprocessorHost.postPut(put, localWalEdit, writeToWAL);
if (coprocessorHost != null) {
coprocessorHost.postPut(put, walEdit, writeToWAL);
}
// do after lock
@ -4140,92 +4144,107 @@ public class HRegion implements HeapSize { // , Writable{
return results;
}
public int mutateRow(RowMutation rm,
public void mutateRow(RowMutation rm,
Integer lockid) throws IOException {
boolean flush = false;
startRegionOperation();
List<WALEdit> walEdits = new ArrayList<WALEdit>(rm.getMutations().size());
// 1. run all pre-hooks before the atomic operation
// if any pre hook indicates "bypass", bypass the entire operation
// Note that this requires creating the WALEdits here and passing
// them to the actual Put/Delete operations.
for (Mutation m : rm.getMutations()) {
WALEdit walEdit = new WALEdit();
walEdits.add(walEdit);
if (coprocessorHost == null) {
continue;
}
if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
// by pass everything
return 0;
}
} else if (m instanceof Delete) {
Delete d = (Delete) m;
prepareDelete(d);
if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
// by pass everything
return 0;
}
}
}
// 2. acquire the row lock
Integer lid = getLock(lockid, rm.getRow(), true);
// 3. acquire the region lock
this.updatesLock.readLock().lock();
// 4. Get a mvcc write number
MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
Integer lid = null;
try {
int i = 0;
// 5. Perform the actual mutations
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
internalPut((Put) m, HConstants.DEFAULT_CLUSTER_ID,
m.getWriteToWAL(), w, walEdits.get(i));
} else if (m instanceof Delete) {
Delete d = (Delete) m;
prepareDelete(d);
internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(),
w, walEdits.get(i));
} else {
throw new DoNotRetryIOException(
"Action must be Put or Delete. But was: "
+ m.getClass().getName());
}
i++;
}
return i;
} finally {
// 6. roll mvcc forward
mvcc.completeMemstoreInsert(w);
// 7. release region lock
this.updatesLock.readLock().unlock();
try {
// 8. run all coprocessor post hooks
if (coprocessorHost != null) {
int i = 0;
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
coprocessorHost.postPut((Put) m, walEdits.get(i),
m.getWriteToWAL());
} else if (m instanceof Delete) {
coprocessorHost.postDelete((Delete) m, walEdits.get(i),
m.getWriteToWAL());
// 1. run all pre-hooks before the atomic operation
// if any pre hook indicates "bypass", bypass the entire operation
// one WALEdit is used for all edits.
WALEdit walEdit = new WALEdit();
if (coprocessorHost != null) {
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
// by pass everything
return;
}
} else if (m instanceof Delete) {
Delete d = (Delete) m;
prepareDelete(d);
if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
// by pass everything
return;
}
i++;
}
}
} finally {
if (lid != null) {
// 9. release the row lock
releaseRowLock(lid);
}
closeRegionOperation();
}
// 2. acquire the row lock
lid = getLock(lockid, rm.getRow(), true);
// 3. acquire the region lock
this.updatesLock.readLock().lock();
// 4. Get a mvcc write number
MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
try {
// 5. Check mutations and apply edits to a single WALEdit
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
checkFamilies(familyMap.keySet());
checkTimestamps(familyMap, now);
updateKVTimestamps(familyMap.values(), byteNow);
} else if (m instanceof Delete) {
Delete d = (Delete) m;
prepareDelete(d);
prepareDeleteTimestamps(d, byteNow);
} else {
throw new DoNotRetryIOException(
"Action must be Put or Delete. But was: "
+ m.getClass().getName());
}
if (m.getWriteToWAL()) {
addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
}
}
// 6. append/sync all edits at once
// TODO: Do batching as in doMiniBatchPut
this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit,
HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
// 7. apply to memstore
long addedSize = 0;
for (Mutation m : rm.getMutations()) {
addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
}
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally {
// 8. roll mvcc forward
mvcc.completeMemstoreInsert(w);
// 9. release region lock
this.updatesLock.readLock().unlock();
}
// 10. run all coprocessor post hooks, after region lock is released
if (coprocessorHost != null) {
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
} else if (m instanceof Delete) {
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
}
}
}
} finally {
if (lid != null) {
// 11. release the row lock
releaseRowLock(lid);
}
if (flush) {
// 12. Flush cache if needed. Do it outside update lock.
requestFlush();
}
closeRegionOperation();
}
}

View File

@ -23,11 +23,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@ -94,62 +94,39 @@ public class ReplicationSink {
// to the same table.
try {
long totalReplicated = 0;
// Map of table => list of puts, we only want to flushCommits once per
// Map of table => list of Rows, we only want to flushCommits once per
// invocation of this method per table.
Map<byte[], List<Put>> puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
Map<byte[], List<Row>> rows = new TreeMap<byte[], List<Row>>(Bytes.BYTES_COMPARATOR);
for (HLog.Entry entry : entries) {
WALEdit edit = entry.getEdit();
byte[] table = entry.getKey().getTablename();
Put put = null;
Delete del = null;
KeyValue lastKV = null;
List<KeyValue> kvs = edit.getKeyValues();
if (kvs.get(0).isDelete()) {
Delete delete = new Delete(kvs.get(0).getRow(),
kvs.get(0).getTimestamp(), null);
delete.setClusterId(entry.getKey().getClusterId());
for (KeyValue kv : kvs) {
switch (Type.codeToType(kv.getType())) {
case DeleteFamily:
// family marker
delete.deleteFamily(kv.getFamily(), kv.getTimestamp());
break;
case DeleteColumn:
// column marker
delete.deleteColumns(kv.getFamily(), kv.getQualifier(),
kv.getTimestamp());
break;
case Delete:
// version marker
delete.deleteColumn(kv.getFamily(), kv.getQualifier(),
kv.getTimestamp());
break;
}
}
delete(entry.getKey().getTablename(), delete);
} else {
byte[] table = entry.getKey().getTablename();
List<Put> tableList = puts.get(table);
if (tableList == null) {
tableList = new ArrayList<Put>();
puts.put(table, tableList);
}
// With mini-batching, we need to expect multiple rows per edit
byte[] lastKey = kvs.get(0).getRow();
Put put = new Put(lastKey, kvs.get(0).getTimestamp());
put.setClusterId(entry.getKey().getClusterId());
for (KeyValue kv : kvs) {
byte[] key = kv.getRow();
if (!Bytes.equals(lastKey, key)) {
tableList.add(put);
put = new Put(key, kv.getTimestamp());
for (KeyValue kv : kvs) {
if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
if (kv.isDelete()) {
del = new Delete(kv.getRow());
del.setClusterId(entry.getKey().getClusterId());
addToMultiMap(rows, table, del);
} else {
put = new Put(kv.getRow());
put.setClusterId(entry.getKey().getClusterId());
addToMultiMap(rows, table, put);
}
put.add(kv);
lastKey = key;
}
tableList.add(put);
if (kv.isDelete()) {
del.addDeleteMarker(kv);
} else {
put.add(kv);
}
lastKV = kv;
}
totalReplicated++;
}
for(byte [] table : puts.keySet()) {
put(table, puts.get(table));
for(byte [] table : rows.keySet()) {
batch(table, rows.get(table));
}
this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime());
@ -162,39 +139,40 @@ public class ReplicationSink {
}
/**
* Do the puts and handle the pool
* Simple helper to a map from key to (a list of) values
* TODO: Make a general utility method
* @param map
* @param key
* @param value
* @return
*/
private <K, V> List<V> addToMultiMap(Map<K, List<V>> map, K key, V value) {
List<V> values = map.get(key);
if (values == null) {
values = new ArrayList<V>();
map.put(key, values);
}
values.add(value);
return values;
}
/**
* Do the changes and handle the pool
* @param tableName table to insert into
* @param puts list of puts
* @param rows list of actions
* @throws IOException
*/
private void put(byte[] tableName, List<Put> puts) throws IOException {
if (puts.isEmpty()) {
private void batch(byte[] tableName, List<Row> rows) throws IOException {
if (rows.isEmpty()) {
return;
}
HTableInterface table = null;
try {
table = this.pool.getTable(tableName);
table.put(puts);
this.metrics.appliedOpsRate.inc(puts.size());
} finally {
if (table != null) {
table.close();
}
}
}
/**
* Do the delete and handle the pool
* @param tableName table to delete in
* @param delete the delete to use
* @throws IOException
*/
private void delete(byte[] tableName, Delete delete) throws IOException {
HTableInterface table = null;
try {
table = this.pool.getTable(tableName);
table.delete(delete);
this.metrics.appliedOpsRate.inc(1);
table.batch(rows);
this.metrics.appliedOpsRate.inc(rows.size());
} catch (InterruptedException ix) {
throw new IOException(ix);
} finally {
if (table != null) {
table.close();

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -4046,7 +4047,6 @@ public class TestFromClientSide {
Bytes.toBytes("a"), Bytes.toBytes("b")
};
RowMutation arm = new RowMutation(ROW);
arm.add(new Delete(ROW));
Put p = new Put(ROW);
p.add(FAMILY, QUALIFIERS[0], VALUE);
arm.add(p);
@ -4054,15 +4054,19 @@ public class TestFromClientSide {
Get g = new Get(ROW);
Result r = t.get(g);
// delete was first, row should exist
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
arm = new RowMutation(ROW);
p = new Put(ROW);
p.add(FAMILY, QUALIFIERS[1], VALUE);
arm.add(p);
arm.add(new Delete(ROW));
Delete d = new Delete(ROW);
d.deleteColumns(FAMILY, QUALIFIERS[0]);
arm.add(d);
t.batch(Arrays.asList((Row)arm));
r = t.get(g);
assertTrue(r.isEmpty());
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
}
@Test

View File

@ -333,6 +333,8 @@ public class TestAtomicOperation extends HBaseTestCase {
}
} catch (IOException e) {
e.printStackTrace();
failures.incrementAndGet();
fail();
}
}
}