HBASE-3584 Allow atomic put/delete in one call (Lars H)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1231441 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-01-14 04:59:44 +00:00
parent a9c183ff98
commit 323d17d3ae
12 changed files with 483 additions and 28 deletions

View File

@ -748,6 +748,20 @@ public class HTable implements HTableInterface {
} }
} }
/**
* {@inheritDoc}
*/
@Override
public void mutateRow(final RowMutation rm) throws IOException {
new ServerCallable<Void>(connection, tableName, rm.getRow(),
operationTimeout) {
public Void call() throws IOException {
server.mutateRow(location.getRegionInfo().getRegionName(), rm);
return null;
}
}.withRetries();
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View File

@ -264,6 +264,16 @@ public interface HTableInterface extends Closeable {
boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Delete delete) throws IOException; byte[] value, Delete delete) throws IOException;
/**
* Performs multiple mutations atomically on a single row. Currently
* {@link Put} and {@link Delete} are supported.
*
* @param arm object that specifies the set of mutations to perform
* atomically
* @throws IOException
*/
public void mutateRow(final RowMutation rm) throws IOException;
/** /**
* Appends values to one or more columns within a single row. * Appends values to one or more columns within a single row.
* <p> * <p>

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Performs multiple mutations atomically on a single row.
* Currently {@link Put} and {@link Delete} are supported.
*
* The mutations are performed in the order in which they
* were added.
*/
public class RowMutation implements Row {
private List<Mutation> mutations = new ArrayList<Mutation>();
private byte [] row;
private static final byte VERSION = (byte)0;
/** Constructor for Writable. DO NOT USE */
public RowMutation() {}
/**
* Create an atomic mutation for the specified row.
* @param row row key
*/
public RowMutation(byte [] row) {
if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException("Row key is invalid");
}
this.row = Arrays.copyOf(row, row.length);
}
/**
* Add a {@link Put} operation to the list of mutations
* @param p The {@link Put} to add
* @throws IOException
*/
public void add(Put p) throws IOException {
internalAdd(p);
}
/**
* Add a {@link Delete} operation to the list of mutations
* @param d The {@link Delete} to add
* @throws IOException
*/
public void add(Delete d) throws IOException {
internalAdd(d);
}
private void internalAdd(Mutation m) throws IOException {
int res = Bytes.compareTo(this.row, m.getRow());
if(res != 0) {
throw new IOException("The row in the recently added Put/Delete " +
Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " +
Bytes.toStringBinary(this.row));
}
mutations.add(m);
}
@Override
public void readFields(final DataInput in) throws IOException {
int version = in.readByte();
if (version > VERSION) {
throw new IOException("version not supported");
}
this.row = Bytes.readByteArray(in);
int numMutations = in.readInt();
mutations.clear();
for(int i = 0; i < numMutations; i++) {
mutations.add((Mutation) HbaseObjectWritable.readObject(in, null));
}
}
@Override
public void write(final DataOutput out) throws IOException {
out.writeByte(VERSION);
Bytes.writeByteArray(out, this.row);
out.writeInt(mutations.size());
for (Mutation m : mutations) {
HbaseObjectWritable.writeObject(out, m, m.getClass(), null);
}
}
@Override
public int compareTo(Row i) {
return Bytes.compareTo(this.getRow(), i.getRow());
}
@Override
public byte[] getRow() {
return row;
}
/**
* @return An unmodifiable list of the current mutations.
*/
public List<Mutation> getMutations() {
return Collections.unmodifiableList(mutations);
}
}

View File

@ -501,6 +501,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
byte[] row) { byte[] row) {
return table.coprocessorProxy(protocol, row); return table.coprocessorProxy(protocol, row);
} }
@Override
public void mutateRow(RowMutation rm) throws IOException {
table.mutateRow(rm);
}
} }
/** The coprocessor */ /** The coprocessor */

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
@ -236,6 +237,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
addToMap(RegionOpeningState.class, code++); addToMap(RegionOpeningState.class, code++);
addToMap(Append.class, code++); addToMap(Append.class, code++);
addToMap(RowMutation.class, code++);
} }
private Class<?> declaredClass; private Class<?> declaredClass;

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
@ -262,6 +263,9 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
byte [] family, byte [] qualifier, long amount, boolean writeToWAL) byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException; throws IOException;
public void mutateRow(byte[] regionName, RowMutation rm)
throws IOException;
/** /**
* Appends values to one or more columns values in a row. Optionally * Appends values to one or more columns values in a row. Optionally
* Returns the updated keys after the append. * Returns the updated keys after the append.

View File

@ -77,10 +77,12 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
@ -1684,7 +1686,7 @@ public class HRegion implements HeapSize { // , Writable{
try { try {
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
prepareDelete(delete); prepareDelete(delete);
internalDelete(delete, delete.getClusterId(), writeToWAL); internalDelete(delete, delete.getClusterId(), writeToWAL, null, null);
} finally { } finally {
if(lockid == null) releaseRowLock(lid); if(lockid == null) releaseRowLock(lid);
} }
@ -1705,21 +1707,26 @@ public class HRegion implements HeapSize { // , Writable{
delete.setFamilyMap(familyMap); delete.setFamilyMap(familyMap);
delete.setClusterId(clusterId); delete.setClusterId(clusterId);
delete.setWriteToWAL(writeToWAL); delete.setWriteToWAL(writeToWAL);
internalDelete(delete, clusterId, writeToWAL); internalDelete(delete, clusterId, writeToWAL, null, null);
} }
/** /**
* @param delete The Delete command
* @param familyMap map of family to edits for the given family. * @param familyMap map of family to edits for the given family.
* @param writeToWAL * @param writeToWAL
* @param writeEntry Optional mvcc write point to use
* @param walEdit Optional walEdit to use. A non-null walEdit indicates
* that the coprocessor hooks are run by the caller
* @throws IOException * @throws IOException
*/ */
private void internalDelete(Delete delete, UUID clusterId, private void internalDelete(Delete delete, UUID clusterId,
boolean writeToWAL) throws IOException { boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry,
WALEdit walEdit) throws IOException {
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap(); Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
WALEdit walEdit = new WALEdit(); WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
/* Run coprocessor pre hook outside of locks to avoid deadlock */ /* Run coprocessor pre hook outside of locks to avoid deadlock */
if (coprocessorHost != null) { if (coprocessorHost != null && walEdit == null) {
if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) { if (coprocessorHost.preDelete(delete, localWalEdit, writeToWAL)) {
return; return;
} }
} }
@ -1783,23 +1790,22 @@ public class HRegion implements HeapSize { // , Writable{
// //
// bunch up all edits across all column families into a // bunch up all edits across all column families into a
// single WALEdit. // single WALEdit.
addFamilyMapToWALEdit(familyMap, walEdit); addFamilyMapToWALEdit(familyMap, localWalEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(), this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, clusterId, now, this.htableDescriptor); localWalEdit, clusterId, now, this.htableDescriptor);
} }
// Now make changes to the memstore. // Now make changes to the memstore.
long addedSize = applyFamilyMapToMemstore(familyMap, null); long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
if (coprocessorHost != null) {
coprocessorHost.postDelete(delete, walEdit, writeToWAL);
}
} finally { } finally {
this.updatesLock.readLock().unlock(); this.updatesLock.readLock().unlock();
} }
// do after lock // do after lock
if (coprocessorHost != null && walEdit == null) {
coprocessorHost.postDelete(delete, localWalEdit, writeToWAL);
}
final long after = EnvironmentEdgeManager.currentTimeMillis(); final long after = EnvironmentEdgeManager.currentTimeMillis();
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix( final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
getTableDesc().getNameAsString(), familyMap.keySet()); getTableDesc().getNameAsString(), familyMap.keySet());
@ -1870,7 +1876,7 @@ public class HRegion implements HeapSize { // , Writable{
try { try {
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
internalPut(put, put.getClusterId(), writeToWAL); internalPut(put, put.getClusterId(), writeToWAL, null, null);
} finally { } finally {
if(lockid == null) releaseRowLock(lid); if(lockid == null) releaseRowLock(lid);
} }
@ -2299,11 +2305,13 @@ public class HRegion implements HeapSize { // , Writable{
// originating cluster. A slave cluster receives the result as a Put // originating cluster. A slave cluster receives the result as a Put
// or Delete // or Delete
if (isPut) { if (isPut) {
internalPut(((Put)w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL,
null, null);
} else { } else {
Delete d = (Delete)w; Delete d = (Delete)w;
prepareDelete(d); prepareDelete(d);
internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null,
null);
} }
return true; return true;
} }
@ -2398,7 +2406,7 @@ public class HRegion implements HeapSize { // , Writable{
p.setFamilyMap(familyMap); p.setFamilyMap(familyMap);
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
p.setWriteToWAL(true); p.setWriteToWAL(true);
this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true); this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null, null);
} }
/** /**
@ -2406,15 +2414,18 @@ public class HRegion implements HeapSize { // , Writable{
* Warning: Assumption is caller has lock on passed in row. * Warning: Assumption is caller has lock on passed in row.
* @param put The Put command * @param put The Put command
* @param writeToWAL if true, then we should write to the log * @param writeToWAL if true, then we should write to the log
* @param writeEntry Optional mvcc write point to use
* @param walEdit Optional walEdit to use. A non-null walEdit indicates
* that the coprocessor hooks are run by the caller
* @throws IOException * @throws IOException
*/ */
private void internalPut(Put put, UUID clusterId, private void internalPut(Put put, UUID clusterId, boolean writeToWAL,
boolean writeToWAL) throws IOException { MultiVersionConsistencyControl.WriteEntry writeEntry, WALEdit walEdit) throws IOException {
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap(); Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
WALEdit walEdit = new WALEdit(); WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
/* run pre put hook outside of lock to avoid deadlock */ /* run pre put hook outside of lock to avoid deadlock */
if (coprocessorHost != null) { if (coprocessorHost != null && walEdit == null) {
if (coprocessorHost.prePut(put, walEdit, writeToWAL)) { if (coprocessorHost.prePut(put, localWalEdit, writeToWAL)) {
return; return;
} }
} }
@ -2434,19 +2445,19 @@ public class HRegion implements HeapSize { // , Writable{
// for some reason fail to write/sync to commit log, the memstore // for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions. // will contain uncommitted transactions.
if (writeToWAL) { if (writeToWAL) {
addFamilyMapToWALEdit(familyMap, walEdit); addFamilyMapToWALEdit(familyMap, localWalEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(), this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, clusterId, now, this.htableDescriptor); localWalEdit, clusterId, now, this.htableDescriptor);
} }
long addedSize = applyFamilyMapToMemstore(familyMap, null); long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally { } finally {
this.updatesLock.readLock().unlock(); this.updatesLock.readLock().unlock();
} }
if (coprocessorHost != null) { if (coprocessorHost != null && walEdit == null) {
coprocessorHost.postPut(put, walEdit, writeToWAL); coprocessorHost.postPut(put, localWalEdit, writeToWAL);
} }
// do after lock // do after lock
@ -4129,6 +4140,95 @@ public class HRegion implements HeapSize { // , Writable{
return results; return results;
} }
public int mutateRow(RowMutation rm,
Integer lockid) throws IOException {
startRegionOperation();
List<WALEdit> walEdits = new ArrayList<WALEdit>(rm.getMutations().size());
// 1. run all pre-hooks before the atomic operation
// if any pre hook indicates "bypass", bypass the entire operation
// Note that this requires creating the WALEdits here and passing
// them to the actual Put/Delete operations.
for (Mutation m : rm.getMutations()) {
WALEdit walEdit = new WALEdit();
walEdits.add(walEdit);
if (coprocessorHost == null) {
continue;
}
if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
// by pass everything
return 0;
}
} else if (m instanceof Delete) {
Delete d = (Delete) m;
prepareDelete(d);
if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
// by pass everything
return 0;
}
}
}
// 2. acquire the row lock
Integer lid = getLock(lockid, rm.getRow(), true);
// 3. acquire the region lock
this.updatesLock.readLock().lock();
// 4. Get a mvcc write number
MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
try {
int i = 0;
// 5. Perform the actual mutations
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
internalPut((Put) m, HConstants.DEFAULT_CLUSTER_ID,
m.getWriteToWAL(), w, walEdits.get(i));
} else if (m instanceof Delete) {
Delete d = (Delete) m;
prepareDelete(d);
internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(),
w, walEdits.get(i));
} else {
throw new DoNotRetryIOException(
"Action must be Put or Delete. But was: "
+ m.getClass().getName());
}
i++;
}
return i;
} finally {
// 6. roll mvcc forward
mvcc.completeMemstoreInsert(w);
// 7. release region lock
this.updatesLock.readLock().unlock();
try {
// 8. run all coprocessor post hooks
if (coprocessorHost != null) {
int i = 0;
for (Mutation m : rm.getMutations()) {
if (m instanceof Put) {
coprocessorHost.postPut((Put) m, walEdits.get(i),
m.getWriteToWAL());
} else if (m instanceof Delete) {
coprocessorHost.postDelete((Delete) m, walEdits.get(i),
m.getWriteToWAL());
}
i++;
}
}
} finally {
if (lid != null) {
// 9. release the row lock
releaseRowLock(lid);
}
closeRegionOperation();
}
}
}
// TODO: There's a lot of boiler plate code identical // TODO: There's a lot of boiler plate code identical
// to increment... See how to better unify that. // to increment... See how to better unify that.
/** /**

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
@ -3151,6 +3152,27 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return null; return null;
} }
@Override
public void mutateRow(byte[] regionName, RowMutation rm)
throws IOException {
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to atomicMutation " +
"regionName is null");
}
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
region.mutateRow(rm, null);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
@Override @Override
public Result append(byte[] regionName, Append append) public Result append(byte[] regionName, Append append)
throws IOException { throws IOException {
@ -3296,6 +3318,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
} else if (action instanceof Append) { } else if (action instanceof Append) {
response.add(regionName, originalIndex, response.add(regionName, originalIndex,
append(regionName, (Append)action)); append(regionName, (Append)action));
} else if (action instanceof RowMutation) {
mutateRow(regionName, (RowMutation)action);
response.add(regionName, originalIndex, new Result());
} else { } else {
LOG.debug("Error: invalid Action, row must be a Get, Delete, " + LOG.debug("Error: invalid Action, row must be a Get, Delete, " +
"Put, Exec, Increment, or Append."); "Put, Exec, Increment, or Append.");

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
@ -647,4 +648,9 @@ public class RemoteHTable implements HTableInterface {
throws IOException, Throwable { throws IOException, Throwable {
throw new UnsupportedOperationException("coprocessorExec not implemented"); throw new UnsupportedOperationException("coprocessorExec not implemented");
} }
@Override
public void mutateRow(RowMutation rm) throws IOException {
throw new IOException("atomicMutation not supported");
}
} }

View File

@ -34,6 +34,8 @@ import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -4035,6 +4037,34 @@ public class TestFromClientSide {
assertTrue(scan.getFamilyMap().containsKey(FAMILY)); assertTrue(scan.getFamilyMap().containsKey(FAMILY));
} }
@Test
public void testRowMutation() throws Exception {
LOG.info("Starting testRowMutation");
final byte [] TABLENAME = Bytes.toBytes("testRowMutation");
HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
byte [][] QUALIFIERS = new byte [][] {
Bytes.toBytes("a"), Bytes.toBytes("b")
};
RowMutation arm = new RowMutation(ROW);
arm.add(new Delete(ROW));
Put p = new Put(ROW);
p.add(FAMILY, QUALIFIERS[0], VALUE);
arm.add(p);
t.mutateRow(arm);
Get g = new Get(ROW);
Result r = t.get(g);
// delete was first, row should exist
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
arm = new RowMutation(ROW);
arm.add(p);
arm.add(new Delete(ROW));
t.batch(Arrays.asList((Row)arm));
r = t.get(g);
assertTrue(r.isEmpty());
}
@Test @Test
public void testAppend() throws Exception { public void testAppend() throws Exception {
LOG.info("Starting testAppend"); LOG.info("Starting testAppend");

View File

@ -131,6 +131,41 @@ public class TestRegionObserverInterface {
table.close(); table.close();
} }
@Test
public void testRowMutation() throws IOException {
byte[] tableName = TEST_TABLE;
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDeleted"},
TEST_TABLE,
new Boolean[] {false, false, false, false, false});
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
Delete delete = new Delete(ROW);
delete.deleteColumn(A, A);
delete.deleteColumn(B, B);
delete.deleteColumn(C, C);
RowMutation arm = new RowMutation(ROW);
arm.add(put);
arm.add(delete);
table.mutateRow(arm);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDeleted"},
TEST_TABLE,
new Boolean[] {false, false, true, true, true}
);
util.deleteTable(tableName);
table.close();
}
@Test @Test
public void testIncrementHook() throws IOException { public void testIncrementHook() throws IOException {
byte[] tableName = TEST_TABLE; byte[] tableName = TEST_TABLE;

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -26,10 +29,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -239,7 +245,98 @@ public class TestAtomicOperation extends HBaseTestCase {
} }
} }
/**
* Test multi-threaded increments.
*/
public void testRowMutationMultiThreads() throws IOException {
LOG.info("Starting test testMutationMultiThreads");
initHRegion(tableName, getName(), fam1);
// create 100 threads, each will alternate between adding and
// removing a column
int numThreads = 100;
int opsPerThread = 1000;
AtomicOperation[] all = new AtomicOperation[numThreads];
AtomicLong timeStamps = new AtomicLong(0);
AtomicInteger failures = new AtomicInteger(0);
// create all threads
for (int i = 0; i < numThreads; i++) {
all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures);
}
// run all threads
for (int i = 0; i < numThreads; i++) {
all[i].start();
}
// wait for all threads to finish
for (int i = 0; i < numThreads; i++) {
try {
all[i].join();
} catch (InterruptedException e) {
}
}
assertEquals(0, failures.get());
}
public static class AtomicOperation extends Thread {
private final HRegion region;
private final int numOps;
private final AtomicLong timeStamps;
private final AtomicInteger failures;
private final Random r = new Random();
public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) {
this.region = region;
this.numOps = numOps;
this.timeStamps = timeStamps;
this.failures = failures;
}
@Override
public void run() {
boolean op = true;
for (int i=0; i<numOps; i++) {
try {
// throw in some flushes
if (r.nextFloat() < 0.001) {
LOG.debug("flushing");
region.flushcache();
}
long ts = timeStamps.incrementAndGet();
RowMutation arm = new RowMutation(row);
if (op) {
Put p = new Put(row, ts);
p.add(fam1, qual1, value1);
arm.add(p);
Delete d = new Delete(row);
d.deleteColumns(fam1, qual2, ts);
arm.add(d);
} else {
Delete d = new Delete(row);
d.deleteColumns(fam1, qual1, ts);
arm.add(d);
Put p = new Put(row, ts);
p.add(fam1, qual2, value2);
arm.add(p);
}
region.mutateRow(arm, null);
op ^= true;
// check: should always see exactly one column
Get g = new Get(row);
Result r = region.get(g, null);
if (r.size() != 1) {
LOG.debug(r);
failures.incrementAndGet();
fail();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@org.junit.Rule @org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =