HBASE-4102 atomicAppend: A put that appends to the latest version of a cell
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1182588 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c2681355b5
commit
202118d3b5
|
@ -14,6 +14,7 @@ Release 0.93.0 - Unreleased
|
|||
HBASE-4465 Lazy-seek optimization for StoreFile scanners (mikhail/liyin)
|
||||
HBASE-4422 Move block cache parameters and references into single
|
||||
CacheConf class (jgray)
|
||||
HBASE-4102 atomicAppend: A put that appends to the latest version of a cell (Lars H)
|
||||
|
||||
BUG FIXES
|
||||
HBASE-4488 Store could miss rows during flush (Lars H via jgray)
|
||||
|
|
|
@ -405,6 +405,82 @@ public class KeyValue implements Writable, HeapSize {
|
|||
this.offset = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an empty KeyValue structure, with specified sizes.
|
||||
* This can be used to partially fill up KeyValues.
|
||||
* <p>
|
||||
* Column is split into two fields, family and qualifier.
|
||||
* @param rlength row length
|
||||
* @param flength family length
|
||||
* @param qlength qualifier length
|
||||
* @param timestamp version timestamp
|
||||
* @param type key type
|
||||
* @param vlength value length
|
||||
* @throws IllegalArgumentException
|
||||
*/
|
||||
public KeyValue(final int rlength,
|
||||
final int flength,
|
||||
final int qlength,
|
||||
final long timestamp, final Type type,
|
||||
final int vlength) {
|
||||
this.bytes = createEmptyByteArray(rlength,
|
||||
flength, qlength,
|
||||
timestamp, type, vlength);
|
||||
this.length = bytes.length;
|
||||
this.offset = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an empty byte[] representing a KeyValue
|
||||
* All lengths are preset and can be filled in later.
|
||||
* @param rlength
|
||||
* @param flength
|
||||
* @param qlength
|
||||
* @param timestamp
|
||||
* @param type
|
||||
* @param vlength
|
||||
* @return The newly created byte array.
|
||||
*/
|
||||
static byte[] createEmptyByteArray(final int rlength, int flength,
|
||||
int qlength, final long timestamp, final Type type, int vlength) {
|
||||
if (rlength > Short.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
|
||||
}
|
||||
if (flength > Byte.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
|
||||
}
|
||||
// Qualifier length
|
||||
if (qlength > Integer.MAX_VALUE - rlength - flength) {
|
||||
throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
|
||||
}
|
||||
// Key length
|
||||
long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
|
||||
if (longkeylength > Integer.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("keylength " + longkeylength + " > " +
|
||||
Integer.MAX_VALUE);
|
||||
}
|
||||
int keylength = (int)longkeylength;
|
||||
// Value length
|
||||
if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
|
||||
throw new IllegalArgumentException("Valuer > " +
|
||||
HConstants.MAXIMUM_VALUE_LENGTH);
|
||||
}
|
||||
|
||||
// Allocate right-sized byte array.
|
||||
byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
|
||||
// Write the correct size markers
|
||||
int pos = 0;
|
||||
pos = Bytes.putInt(bytes, pos, keylength);
|
||||
pos = Bytes.putInt(bytes, pos, vlength);
|
||||
pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
|
||||
pos += rlength;
|
||||
pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
|
||||
pos += flength + qlength;
|
||||
pos = Bytes.putLong(bytes, pos, timestamp);
|
||||
pos = Bytes.putByte(bytes, pos, type.getCode());
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write KeyValue format into a byte array.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Performs Append operations on a single row.
|
||||
* <p>
|
||||
* Note that this operation does not appear atomic to readers. Appends are done
|
||||
* under a single row lock, so write operations to a row are synchronized, but
|
||||
* readers do not take row locks so get and scan operations can see this
|
||||
* operation partially completed.
|
||||
* <p>
|
||||
* To append to a set of columns of a row, instantiate an Append object with the
|
||||
* row to append to. At least one column to append must be specified using the
|
||||
* {@link #add(byte[], byte[], long)} method.
|
||||
*/
|
||||
public class Append extends Mutation implements Writable {
|
||||
// TODO: refactor to derive from Put?
|
||||
private static final String RETURN_RESULTS = "_rr_";
|
||||
private static final byte APPEND_VERSION = (byte)1;
|
||||
|
||||
/**
|
||||
* @param returnResults
|
||||
* True (default) if the append operation should return the results.
|
||||
* A client that is not interested in the result can save network
|
||||
* bandwidth setting this to false.
|
||||
*/
|
||||
public void setReturnResults(boolean returnResults) {
|
||||
setAttribute(RETURN_RESULTS, Bytes.toBytes(returnResults));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return current setting for returnResults
|
||||
*/
|
||||
public boolean isReturnResults() {
|
||||
byte[] v = getAttribute(RETURN_RESULTS);
|
||||
return v == null ? true : Bytes.toBoolean(v);
|
||||
}
|
||||
|
||||
/** Constructor for Writable. DO NOT USE */
|
||||
public Append() {}
|
||||
|
||||
/**
|
||||
* Create a Append operation for the specified row.
|
||||
* <p>
|
||||
* At least one column must be appended to.
|
||||
* @param row row key
|
||||
*/
|
||||
public Append(byte[] row) {
|
||||
this.row = Arrays.copyOf(row, row.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the specified column and value to this Append operation.
|
||||
* @param family family name
|
||||
* @param qualifier column qualifier
|
||||
* @param value value to append to specified column
|
||||
* @return this
|
||||
*/
|
||||
public Append add(byte [] family, byte [] qualifier, byte [] value) {
|
||||
List<KeyValue> list = familyMap.get(family);
|
||||
if(list == null) {
|
||||
list = new ArrayList<KeyValue>();
|
||||
}
|
||||
list.add(new KeyValue(
|
||||
this.row, family, qualifier, this.ts, KeyValue.Type.Put, value));
|
||||
familyMap.put(family, list);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(final DataInput in)
|
||||
throws IOException {
|
||||
int version = in.readByte();
|
||||
if (version > APPEND_VERSION) {
|
||||
throw new IOException("version not supported: "+version);
|
||||
}
|
||||
this.row = Bytes.readByteArray(in);
|
||||
this.ts = in.readLong();
|
||||
this.lockId = in.readLong();
|
||||
this.writeToWAL = in.readBoolean();
|
||||
int numFamilies = in.readInt();
|
||||
if (!this.familyMap.isEmpty()) this.familyMap.clear();
|
||||
for(int i=0;i<numFamilies;i++) {
|
||||
byte [] family = Bytes.readByteArray(in);
|
||||
int numKeys = in.readInt();
|
||||
List<KeyValue> keys = new ArrayList<KeyValue>(numKeys);
|
||||
int totalLen = in.readInt();
|
||||
byte [] buf = new byte[totalLen];
|
||||
int offset = 0;
|
||||
for (int j = 0; j < numKeys; j++) {
|
||||
int keyLength = in.readInt();
|
||||
in.readFully(buf, offset, keyLength);
|
||||
keys.add(new KeyValue(buf, offset, keyLength));
|
||||
offset += keyLength;
|
||||
}
|
||||
this.familyMap.put(family, keys);
|
||||
}
|
||||
readAttributes(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(final DataOutput out)
|
||||
throws IOException {
|
||||
out.writeByte(APPEND_VERSION);
|
||||
Bytes.writeByteArray(out, this.row);
|
||||
out.writeLong(this.ts);
|
||||
out.writeLong(this.lockId);
|
||||
out.writeBoolean(this.writeToWAL);
|
||||
out.writeInt(familyMap.size());
|
||||
for (Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
|
||||
Bytes.writeByteArray(out, entry.getKey());
|
||||
List<KeyValue> keys = entry.getValue();
|
||||
out.writeInt(keys.size());
|
||||
int totalLen = 0;
|
||||
for(KeyValue kv : keys) {
|
||||
totalLen += kv.getLength();
|
||||
}
|
||||
out.writeInt(totalLen);
|
||||
for(KeyValue kv : keys) {
|
||||
out.writeInt(kv.getLength());
|
||||
out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
|
||||
}
|
||||
}
|
||||
writeAttributes(out);
|
||||
}
|
||||
}
|
|
@ -736,6 +736,25 @@ public class HTable implements HTableInterface, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Result append(final Append append) throws IOException {
|
||||
if (append.numFamilies() == 0) {
|
||||
throw new IOException(
|
||||
"Invalid arguments to append, no columns specified");
|
||||
}
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
|
||||
public Result call() throws IOException {
|
||||
return server.append(
|
||||
location.getRegionInfo().getRegionName(), append);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
@ -261,6 +261,21 @@ public interface HTableInterface {
|
|||
boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException;
|
||||
|
||||
/**
|
||||
* Appends values to one or more columns within a single row.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Appends are done
|
||||
* under a single row lock, so write operations to a row are synchronized, but
|
||||
* readers do not take row locks so get and scan operations can see this
|
||||
* operation partially completed.
|
||||
*
|
||||
* @param append object that specifies the columns and amounts to be used
|
||||
* for the increment operations
|
||||
* @throws IOException e
|
||||
* @return values of columns after the append operation (maybe null)
|
||||
*/
|
||||
public Result append(final Append append) throws IOException;
|
||||
|
||||
/**
|
||||
* Increments one or more columns within a single row.
|
||||
* <p>
|
||||
|
|
|
@ -203,4 +203,22 @@ public abstract class Mutation extends OperationWithAttributes {
|
|||
}
|
||||
return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the total number of KeyValues
|
||||
*/
|
||||
public int size() {
|
||||
int size = 0;
|
||||
for(List<KeyValue> kvList : this.familyMap.values()) {
|
||||
size += kvList.size();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of different families
|
||||
*/
|
||||
public int numFamilies() {
|
||||
return familyMap.size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -318,24 +318,6 @@ public class Put extends Mutation
|
|||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of different families included in this put
|
||||
*/
|
||||
public int numFamilies() {
|
||||
return familyMap.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the total number of KeyValues that will be added with this put
|
||||
*/
|
||||
public int size() {
|
||||
int size = 0;
|
||||
for(List<KeyValue> kvList : this.familyMap.values()) {
|
||||
size += kvList.size();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
//HeapSize
|
||||
public long heapSize() {
|
||||
long heapsize = OVERHEAD;
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
|
@ -184,6 +185,16 @@ public abstract class BaseRegionObserver implements RegionObserver {
|
|||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Append append, final Result result) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Append append, final Result result) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
|
|
|
@ -393,6 +393,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
writeToWAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result append(Append append) throws IOException {
|
||||
return table.append(append);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(Increment increment) throws IOException {
|
||||
return table.increment(increment);
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
|
@ -457,6 +458,38 @@ public interface RegionObserver extends Coprocessor {
|
|||
final long amount, final boolean writeToWAL, final long result)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called before Append
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#complete to skip any subsequent chained
|
||||
* coprocessors
|
||||
* @param c the environment provided by the region server
|
||||
* @param append Append object
|
||||
* @param result The result to return to the client if default processing
|
||||
* is bypassed. Can be modified. Will not be used if default processing
|
||||
* is not bypassed.
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
void preAppend(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final Append append, final Result result)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called after Append
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#complete to skip any subsequent chained
|
||||
* coprocessors
|
||||
* @param c the environment provided by the region server
|
||||
* @param append Append object
|
||||
* @param result the result returned by increment, can be modified
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
void postAppend(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final Append append, final Result result)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called before Increment
|
||||
* <p>
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HServerInfo;
|
|||
import org.apache.hadoop.hbase.HServerLoad;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
|
@ -238,6 +239,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
|
|||
|
||||
addToMap(RegionOpeningState.class, code++);
|
||||
|
||||
addToMap(Append.class, code++);
|
||||
|
||||
}
|
||||
|
||||
private Class<?> declaredClass;
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
|
@ -210,6 +211,19 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
|
|||
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Appends values to one or more columns values in a row. Optionally
|
||||
* Returns the updated keys after the append.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Appends are done
|
||||
* under a row lock but readers do not take row locks.
|
||||
* @param regionName region name
|
||||
* @param append Append operation
|
||||
* @return changed cells (maybe null)
|
||||
*/
|
||||
public Result append(byte[] regionName, Append append)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Increments one or more columns values in a row. Returns the
|
||||
* updated keys after the increment.
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
|
@ -3503,6 +3504,149 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return results;
|
||||
}
|
||||
|
||||
// TODO: There's a lot of boiler plate code identical
|
||||
// to increment... See how to better unify that.
|
||||
/**
|
||||
*
|
||||
* Perform one or more append operations on a row.
|
||||
* <p>
|
||||
* Appends performed are done under row lock but reads do not take locks out
|
||||
* so this can be seen partially complete by gets and scans.
|
||||
*
|
||||
* @param append
|
||||
* @param lockid
|
||||
* @param returnResult
|
||||
* @param writeToWAL
|
||||
* @return new keyvalues after increment
|
||||
* @throws IOException
|
||||
*/
|
||||
public Result append(Append append, Integer lockid, boolean writeToWAL)
|
||||
throws IOException {
|
||||
// TODO: Use RWCC to make this set of appends atomic to reads
|
||||
byte[] row = append.getRow();
|
||||
checkRow(row, "append");
|
||||
boolean flush = false;
|
||||
WALEdit walEdits = null;
|
||||
List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>(append.size());
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
long size = 0;
|
||||
long txid = 0;
|
||||
|
||||
// Lock row
|
||||
startRegionOperation();
|
||||
this.writeRequestsCount.increment();
|
||||
try {
|
||||
Integer lid = getLock(lockid, row, true);
|
||||
this.updatesLock.readLock().lock();
|
||||
try {
|
||||
// Process each family
|
||||
for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
|
||||
.entrySet()) {
|
||||
|
||||
Store store = stores.get(family.getKey());
|
||||
|
||||
// Get previous values for all columns in this family
|
||||
Get get = new Get(row);
|
||||
for (KeyValue kv : family.getValue()) {
|
||||
get.addColumn(family.getKey(), kv.getQualifier());
|
||||
}
|
||||
List<KeyValue> results = get(get, false);
|
||||
|
||||
// Iterate the input columns and update existing values if they were
|
||||
// found, otherwise add new column initialized to the append value
|
||||
|
||||
// Avoid as much copying as possible. Every byte is copied at most
|
||||
// once.
|
||||
// Would be nice if KeyValue had scatter/gather logic
|
||||
int idx = 0;
|
||||
for (KeyValue kv : family.getValue()) {
|
||||
KeyValue newKV;
|
||||
if (idx < results.size()
|
||||
&& results.get(idx).matchingQualifier(kv.getBuffer(),
|
||||
kv.getQualifierOffset(), kv.getQualifierLength())) {
|
||||
KeyValue oldKv = results.get(idx);
|
||||
// allocate an empty kv once
|
||||
newKV = new KeyValue(row.length, kv.getFamilyLength(),
|
||||
kv.getQualifierLength(), now, KeyValue.Type.Put,
|
||||
oldKv.getValueLength() + kv.getValueLength());
|
||||
// copy in the value
|
||||
System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
|
||||
newKV.getBuffer(), newKV.getValueOffset(),
|
||||
oldKv.getValueLength());
|
||||
System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
|
||||
newKV.getBuffer(),
|
||||
newKV.getValueOffset() + oldKv.getValueLength(),
|
||||
kv.getValueLength());
|
||||
idx++;
|
||||
} else {
|
||||
// allocate an empty kv once
|
||||
newKV = new KeyValue(row.length, kv.getFamilyLength(),
|
||||
kv.getQualifierLength(), now, KeyValue.Type.Put,
|
||||
kv.getValueLength());
|
||||
// copy in the value
|
||||
System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
|
||||
newKV.getBuffer(), newKV.getValueOffset(),
|
||||
kv.getValueLength());
|
||||
}
|
||||
// copy in row, family, and qualifier
|
||||
System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
|
||||
newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
|
||||
System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
|
||||
newKV.getBuffer(), newKV.getFamilyOffset(),
|
||||
kv.getFamilyLength());
|
||||
System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
|
||||
newKV.getBuffer(), newKV.getQualifierOffset(),
|
||||
kv.getQualifierLength());
|
||||
|
||||
kvs.add(newKV);
|
||||
|
||||
// Append update to WAL
|
||||
if (writeToWAL) {
|
||||
if (walEdits == null) {
|
||||
walEdits = new WALEdit();
|
||||
}
|
||||
walEdits.add(newKV);
|
||||
}
|
||||
}
|
||||
|
||||
// Write the KVs for this family into the store
|
||||
size += store.upsert(kvs);
|
||||
allKVs.addAll(kvs);
|
||||
kvs.clear();
|
||||
}
|
||||
|
||||
// Actually write to WAL now
|
||||
if (writeToWAL) {
|
||||
// Using default cluster id, as this can only happen in the orginating
|
||||
// cluster. A slave cluster receives the final value (not the delta)
|
||||
// as a Put.
|
||||
txid = this.log.appendNoSync(regionInfo,
|
||||
this.htableDescriptor.getName(), walEdits,
|
||||
HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
|
||||
}
|
||||
|
||||
size = this.addAndGetGlobalMemstoreSize(size);
|
||||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
if (writeToWAL) {
|
||||
this.log.sync(txid); // sync the transaction log outside the rowlock
|
||||
}
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
|
||||
if (flush) {
|
||||
// Request a cache flush. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
|
||||
return append.isReturnResults() ? new Result(allKVs) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Perform one or more increment operations on a row.
|
||||
|
|
|
@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
|||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
||||
import org.apache.hadoop.hbase.client.Action;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
|
@ -124,7 +125,6 @@ import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||
|
@ -2832,6 +2832,37 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result append(byte[] regionName, Append append)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
if (regionName == null) {
|
||||
throw new IOException("Invalid arguments to increment " +
|
||||
"regionName is null");
|
||||
}
|
||||
requestCount.incrementAndGet();
|
||||
try {
|
||||
HRegion region = getRegion(regionName);
|
||||
Integer lock = getLockFromId(append.getLockId());
|
||||
Append appVal = append;
|
||||
Result resVal;
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
resVal = region.getCoprocessorHost().preAppend(appVal);
|
||||
if (resVal != null) {
|
||||
return resVal;
|
||||
}
|
||||
}
|
||||
resVal = region.append(appVal, lock, append.getWriteToWAL());
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postAppend(appVal, resVal);
|
||||
}
|
||||
return resVal;
|
||||
} catch (IOException e) {
|
||||
checkFileSystem();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(byte[] regionName, Increment increment)
|
||||
throws IOException {
|
||||
|
|
|
@ -946,6 +946,34 @@ public class RegionCoprocessorHost
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param append append object
|
||||
* @return result to return to client if default operation should be
|
||||
* bypassed, null otherwise
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public Result preAppend(Append append)
|
||||
throws IOException {
|
||||
boolean bypass = false;
|
||||
Result result = new Result();
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof RegionObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionObserver)env.getInstance()).preAppend(ctx, append, result);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
bypass |= ctx.shouldBypass();
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return bypass ? result : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param increment increment object
|
||||
* @return result to return to client if default operation should be
|
||||
|
@ -974,6 +1002,29 @@ public class RegionCoprocessorHost
|
|||
return bypass ? result : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param append Append object
|
||||
* @param result the result returned by postAppend
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void postAppend(final Append append, Result result)
|
||||
throws IOException {
|
||||
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
|
||||
for (RegionEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof RegionObserver) {
|
||||
ctx = ObserverContext.createAndPrepare(env, ctx);
|
||||
try {
|
||||
((RegionObserver)env.getInstance()).postAppend(ctx, append, result);
|
||||
} catch (Throwable e) {
|
||||
handleCoprocessorThrowable(env, e);
|
||||
}
|
||||
if (ctx.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param increment increment object
|
||||
* @param result the result returned by postIncrement
|
||||
|
|
|
@ -596,6 +596,10 @@ public class RemoteHTable implements HTableInterface {
|
|||
throw new IOException("Increment not supported");
|
||||
}
|
||||
|
||||
public Result append(Append append) throws IOException {
|
||||
throw new IOException("Append not supported");
|
||||
}
|
||||
|
||||
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount) throws IOException {
|
||||
throw new IOException("incrementColumnValue not supported");
|
||||
|
|
|
@ -3913,6 +3913,29 @@ public class TestFromClientSide {
|
|||
assertTrue(scan.getFamilyMap().containsKey(FAMILY));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() throws Exception {
|
||||
LOG.info("Starting testAppend");
|
||||
final byte [] TABLENAME = Bytes.toBytes("testAppend");
|
||||
HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||
byte[] v1 = Bytes.toBytes("42");
|
||||
byte[] v2 = Bytes.toBytes("23");
|
||||
byte [][] QUALIFIERS = new byte [][] {
|
||||
Bytes.toBytes("a"), Bytes.toBytes("b")
|
||||
};
|
||||
Append a = new Append(ROW);
|
||||
a.add(FAMILY, QUALIFIERS[0], v1);
|
||||
a.add(FAMILY, QUALIFIERS[1], v2);
|
||||
a.setReturnResults(false);
|
||||
assertNullResult(t.append(a));
|
||||
|
||||
a = new Append(ROW);
|
||||
a.add(FAMILY, QUALIFIERS[0], v2);
|
||||
a.add(FAMILY, QUALIFIERS[1], v1);
|
||||
Result r = t.append(a);
|
||||
assertEquals(0, Bytes.compareTo(Bytes.add(v1,v2), r.getValue(FAMILY, QUALIFIERS[0])));
|
||||
assertEquals(0, Bytes.compareTo(Bytes.add(v2,v1), r.getValue(FAMILY, QUALIFIERS[1])));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() throws Exception {
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -21,72 +19,32 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.filter.NullComparator;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
||||
/**
|
||||
* Testing of HRegion.incrementColumnValue
|
||||
*
|
||||
* Testing of HRegion.incrementColumnValue, HRegion.increment,
|
||||
* and HRegion.append
|
||||
*/
|
||||
public class TestIncrement extends HBaseTestCase {
|
||||
static final Log LOG = LogFactory.getLog(TestIncrement.class);
|
||||
public class TestAtomicOperation extends HBaseTestCase {
|
||||
static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
|
||||
|
||||
HRegion region = null;
|
||||
private final String DIR = HBaseTestingUtility.getTestDir() +
|
||||
|
@ -123,6 +81,29 @@ public class TestIncrement extends HBaseTestCase {
|
|||
// individual code pieces in the HRegion.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Test basic append operation.
|
||||
* More tests in
|
||||
* @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
|
||||
*/
|
||||
public void testAppend() throws IOException {
|
||||
initHRegion(tableName, getName(), fam1);
|
||||
String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
|
||||
" The Universe, and Everything";
|
||||
String v2 = " is... 42.";
|
||||
Append a = new Append(row);
|
||||
a.setReturnResults(false);
|
||||
a.add(fam1, qual1, Bytes.toBytes(v1));
|
||||
a.add(fam1, qual2, Bytes.toBytes(v2));
|
||||
assertNull(region.append(a, null, true));
|
||||
a = new Append(row);
|
||||
a.add(fam1, qual1, Bytes.toBytes(v2));
|
||||
a.add(fam1, qual2, Bytes.toBytes(v1));
|
||||
Result result = region.append(a, null, true);
|
||||
assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
|
||||
assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test one increment command.
|
||||
*/
|
Loading…
Reference in New Issue