HBASE-2946 Increment multiple columns in a row at once
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1027681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3846a9c7e4
commit
97f7976755
|
@ -1096,6 +1096,8 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-3053 Add ability to have multiple Masters LocalHBaseCluster for
|
||||
test writing
|
||||
HBASE-2201 JRuby shell for replication
|
||||
HBASE-2946 Increment multiple columns in a row at once
|
||||
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-410 [testing] Speed up the test suite
|
||||
|
|
|
@ -649,6 +649,22 @@ public class HTable implements HTableInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(final Increment increment) throws IOException {
|
||||
if (!increment.hasFamilies()) {
|
||||
throw new IOException(
|
||||
"Invalid arguments to increment, no columns specified");
|
||||
}
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Result>(connection, tableName, increment.getRow()) {
|
||||
public Result call() throws IOException {
|
||||
return server.increment(
|
||||
location.getRegionInfo().getRegionName(), increment);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final long amount)
|
||||
|
|
|
@ -19,13 +19,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used to communicate with a single HBase table.
|
||||
*
|
||||
|
@ -231,7 +231,7 @@ public interface HTableInterface {
|
|||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected
|
||||
* value. If it does, it adds the delete. If the passed value is null, the
|
||||
* value. If it does, it adds the delete. If the passed value is null, the
|
||||
* check is for the lack of column (ie: non-existance)
|
||||
*
|
||||
* @param row to check
|
||||
|
@ -245,6 +245,21 @@ public interface HTableInterface {
|
|||
boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException;
|
||||
|
||||
/**
|
||||
* Increments one or more columns within a single row.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Increments are done
|
||||
* under a single row lock, so write operations to a row are synchronized, but
|
||||
* readers do not take row locks so get and scan operations can see this
|
||||
* operation partially completed.
|
||||
*
|
||||
* @param increment object that specifies the columns and amounts to be used
|
||||
* for the increment operations
|
||||
* @throws IOException e
|
||||
* @return values of columns after the increment
|
||||
*/
|
||||
public Result increment(final Increment increment) throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically increments a column value.
|
||||
* <p>
|
||||
|
|
|
@ -0,0 +1,294 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Used to perform Increment operations on a single row.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Increments are done
|
||||
* under a single row lock, so write operations to a row are synchronized, but
|
||||
* readers do not take row locks so get and scan operations can see this
|
||||
* operation partially completed.
|
||||
* <p>
|
||||
* To increment columns of a row, instantiate an Increment object with the row
|
||||
* to increment. At least one column to increment must be specified using the
|
||||
* {@link #addColumn(byte[], byte[], long)} method.
|
||||
*/
|
||||
public class Increment implements Writable {
|
||||
private static final byte INCREMENT_VERSION = (byte)1;
|
||||
|
||||
private byte [] row = null;
|
||||
private long lockId = -1L;
|
||||
private boolean writeToWAL = true;
|
||||
private Map<byte [], NavigableMap<byte [], Long>> familyMap =
|
||||
new TreeMap<byte [], NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/** Constructor for Writable. DO NOT USE */
|
||||
public Increment() {}
|
||||
|
||||
/**
|
||||
* Create a Increment operation for the specified row.
|
||||
* <p>
|
||||
* At least one column must be incremented.
|
||||
* @param row row key
|
||||
*/
|
||||
public Increment(byte [] row) {
|
||||
this(row, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Increment operation for the specified row, using an existing row
|
||||
* lock.
|
||||
* <p>
|
||||
* At least one column must be incremented.
|
||||
* @param row row key
|
||||
* @param rowLock previously acquired row lock, or null
|
||||
*/
|
||||
public Increment(byte [] row, RowLock rowLock) {
|
||||
this.row = row;
|
||||
if(rowLock != null) {
|
||||
this.lockId = rowLock.getLockId();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the column from the specific family with the specified qualifier
|
||||
* by the specified amount.
|
||||
* <p>
|
||||
* Overrides previous calls to addColumn for this family and qualifier.
|
||||
* @param family family name
|
||||
* @param qualifier column qualifier
|
||||
* @param amount amount to increment by
|
||||
* @return the Increment object
|
||||
*/
|
||||
public Increment addColumn(byte [] family, byte [] qualifier, long amount) {
|
||||
NavigableMap<byte [], Long> set = familyMap.get(family);
|
||||
if(set == null) {
|
||||
set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
|
||||
}
|
||||
set.put(qualifier, amount);
|
||||
familyMap.put(family, set);
|
||||
return this;
|
||||
}
|
||||
|
||||
/* Accessors */
|
||||
|
||||
/**
|
||||
* Method for retrieving the increment's row
|
||||
* @return row
|
||||
*/
|
||||
public byte [] getRow() {
|
||||
return this.row;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for retrieving the increment's RowLock
|
||||
* @return RowLock
|
||||
*/
|
||||
public RowLock getRowLock() {
|
||||
return new RowLock(this.row, this.lockId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for retrieving the increment's lockId
|
||||
* @return lockId
|
||||
*/
|
||||
public long getLockId() {
|
||||
return this.lockId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for retrieving whether WAL will be written to or not
|
||||
* @return true if WAL should be used, false if not
|
||||
*/
|
||||
public boolean getWriteToWAL() {
|
||||
return this.writeToWAL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether this operation should write to the WAL or not.
|
||||
* @param writeToWAL true if WAL should be used, false if not
|
||||
* @return this increment operation
|
||||
*/
|
||||
public Increment setWriteToWAL(boolean writeToWAL) {
|
||||
this.writeToWAL = writeToWAL;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for retrieving the keys in the familyMap
|
||||
* @return keys in the current familyMap
|
||||
*/
|
||||
public Set<byte[]> familySet() {
|
||||
return this.familyMap.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for retrieving the number of families to increment from
|
||||
* @return number of families
|
||||
*/
|
||||
public int numFamilies() {
|
||||
return this.familyMap.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for retrieving the number of columns to increment
|
||||
* @return number of columns across all families
|
||||
*/
|
||||
public int numColumns() {
|
||||
if (!hasFamilies()) return 0;
|
||||
int num = 0;
|
||||
for (NavigableMap<byte [], Long> family : familyMap.values()) {
|
||||
num += family.size();
|
||||
}
|
||||
return num;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for checking if any families have been inserted into this Increment
|
||||
* @return true if familyMap is non empty false otherwise
|
||||
*/
|
||||
public boolean hasFamilies() {
|
||||
return !this.familyMap.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for retrieving the increment's familyMap
|
||||
* @return familyMap
|
||||
*/
|
||||
public Map<byte[],NavigableMap<byte[], Long>> getFamilyMap() {
|
||||
return this.familyMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return String
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("row=");
|
||||
sb.append(Bytes.toString(this.row));
|
||||
if(this.familyMap.size() == 0) {
|
||||
sb.append(", no columns set to be incremented");
|
||||
return sb.toString();
|
||||
}
|
||||
sb.append(", families=");
|
||||
boolean moreThanOne = false;
|
||||
for(Map.Entry<byte [], NavigableMap<byte[], Long>> entry :
|
||||
this.familyMap.entrySet()) {
|
||||
if(moreThanOne) {
|
||||
sb.append("), ");
|
||||
} else {
|
||||
moreThanOne = true;
|
||||
sb.append("{");
|
||||
}
|
||||
sb.append("(family=");
|
||||
sb.append(Bytes.toString(entry.getKey()));
|
||||
sb.append(", columns=");
|
||||
if(entry.getValue() == null) {
|
||||
sb.append("NONE");
|
||||
} else {
|
||||
sb.append("{");
|
||||
boolean moreThanOneB = false;
|
||||
for(Map.Entry<byte [], Long> column : entry.getValue().entrySet()) {
|
||||
if(moreThanOneB) {
|
||||
sb.append(", ");
|
||||
} else {
|
||||
moreThanOneB = true;
|
||||
}
|
||||
sb.append(Bytes.toString(column.getKey()) + "+=" + column.getValue());
|
||||
}
|
||||
sb.append("}");
|
||||
}
|
||||
}
|
||||
sb.append("}");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
//Writable
|
||||
public void readFields(final DataInput in)
|
||||
throws IOException {
|
||||
int version = in.readByte();
|
||||
if (version > INCREMENT_VERSION) {
|
||||
throw new IOException("unsupported version");
|
||||
}
|
||||
this.row = Bytes.readByteArray(in);
|
||||
this.lockId = in.readLong();
|
||||
int numFamilies = in.readInt();
|
||||
if (numFamilies == 0) {
|
||||
throw new IOException("At least one column required");
|
||||
}
|
||||
this.familyMap =
|
||||
new TreeMap<byte [],NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
|
||||
for(int i=0; i<numFamilies; i++) {
|
||||
byte [] family = Bytes.readByteArray(in);
|
||||
boolean hasColumns = in.readBoolean();
|
||||
NavigableMap<byte [], Long> set = null;
|
||||
if(hasColumns) {
|
||||
int numColumns = in.readInt();
|
||||
set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
|
||||
for(int j=0; j<numColumns; j++) {
|
||||
byte [] qualifier = Bytes.readByteArray(in);
|
||||
set.put(qualifier, in.readLong());
|
||||
}
|
||||
} else {
|
||||
throw new IOException("At least one column required per family");
|
||||
}
|
||||
this.familyMap.put(family, set);
|
||||
}
|
||||
}
|
||||
|
||||
public void write(final DataOutput out)
|
||||
throws IOException {
|
||||
out.writeByte(INCREMENT_VERSION);
|
||||
Bytes.writeByteArray(out, this.row);
|
||||
out.writeLong(this.lockId);
|
||||
if (familyMap.size() == 0) {
|
||||
throw new IOException("At least one column required");
|
||||
}
|
||||
out.writeInt(familyMap.size());
|
||||
for(Map.Entry<byte [], NavigableMap<byte [], Long>> entry :
|
||||
familyMap.entrySet()) {
|
||||
Bytes.writeByteArray(out, entry.getKey());
|
||||
NavigableMap<byte [], Long> columnSet = entry.getValue();
|
||||
if(columnSet == null) {
|
||||
throw new IOException("At least one column required per family");
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeInt(columnSet.size());
|
||||
for(Map.Entry<byte [], Long> qualifier : columnSet.entrySet()) {
|
||||
Bytes.writeByteArray(out, qualifier.getKey());
|
||||
out.writeLong(qualifier.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.MultiPut;
|
||||
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
||||
import org.apache.hadoop.hbase.client.MultiAction;
|
||||
|
@ -191,13 +192,15 @@ public class HbaseObjectWritable implements Writable, Configurable {
|
|||
|
||||
addToMap(NavigableSet.class, code++);
|
||||
addToMap(ColumnPrefixFilter.class, code++);
|
||||
|
||||
|
||||
// Multi
|
||||
addToMap(Row.class, code++);
|
||||
addToMap(Action.class, code++);
|
||||
addToMap(MultiAction.class, code++);
|
||||
addToMap(MultiResponse.class, code++);
|
||||
|
||||
|
||||
addToMap(Increment.class, code++);
|
||||
|
||||
}
|
||||
|
||||
private Class<?> declaredClass;
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
|||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.MultiAction;
|
||||
import org.apache.hadoop.hbase.client.MultiPut;
|
||||
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
||||
|
@ -193,6 +194,18 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab
|
|||
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Increments one or more columns values in a row. Returns the
|
||||
* updated keys after the increment.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Increments are done
|
||||
* under a row lock but readers do not take row locks.
|
||||
* @param regionName region name
|
||||
* @param increment increment operation
|
||||
* @return incremented cells
|
||||
*/
|
||||
public Result increment(byte[] regionName, Increment increment)
|
||||
throws IOException;
|
||||
|
||||
//
|
||||
// remote scanner interface
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Collection;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
@ -54,14 +55,15 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowLock;
|
||||
|
@ -2958,6 +2960,102 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform one or more increment operations on a row.
|
||||
* <p>
|
||||
* Increments performed are done under row lock but reads do not take locks
|
||||
* out so this can be seen partially complete by gets and scans.
|
||||
* @param increment
|
||||
* @param lockid
|
||||
* @param writeToWAL
|
||||
* @return new keyvalues after increment
|
||||
* @throws IOException
|
||||
*/
|
||||
public Result increment(Increment increment, Integer lockid,
|
||||
boolean writeToWAL)
|
||||
throws IOException {
|
||||
// TODO: Use RWCC to make this set of increments atomic to reads
|
||||
byte [] row = increment.getRow();
|
||||
checkRow(row);
|
||||
boolean flush = false;
|
||||
WALEdit walEdits = null;
|
||||
List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
long size = 0;
|
||||
|
||||
// Lock row
|
||||
startRegionOperation();
|
||||
try {
|
||||
Integer lid = getLock(lockid, row, true);
|
||||
try {
|
||||
// Process each family
|
||||
for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
|
||||
increment.getFamilyMap().entrySet()) {
|
||||
|
||||
Store store = stores.get(family.getKey());
|
||||
|
||||
// Get previous values for all columns in this family
|
||||
Get get = new Get(row);
|
||||
for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
|
||||
get.addColumn(family.getKey(), column.getKey());
|
||||
}
|
||||
List<KeyValue> results = getLastIncrement(get);
|
||||
|
||||
// Iterate the input columns and update existing values if they were
|
||||
// found, otherwise add new column initialized to the increment amount
|
||||
int idx = 0;
|
||||
for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
|
||||
long amount = column.getValue();
|
||||
if (idx < results.size() &&
|
||||
results.get(idx).matchingQualifier(column.getKey())) {
|
||||
amount += Bytes.toLong(results.get(idx).getValue());
|
||||
idx++;
|
||||
}
|
||||
|
||||
// Append new incremented KeyValue to list
|
||||
KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
|
||||
now, Bytes.toBytes(amount));
|
||||
kvs.add(newKV);
|
||||
|
||||
// Append update to WAL
|
||||
if (writeToWAL) {
|
||||
if (walEdits == null) {
|
||||
walEdits = new WALEdit();
|
||||
}
|
||||
walEdits.add(newKV);
|
||||
}
|
||||
}
|
||||
|
||||
// Write the KVs for this family into the store
|
||||
size += store.upsert(kvs);
|
||||
allKVs.addAll(kvs);
|
||||
kvs.clear();
|
||||
}
|
||||
|
||||
// Actually write to WAL now
|
||||
if (writeToWAL) {
|
||||
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
|
||||
walEdits, now);
|
||||
}
|
||||
|
||||
size = this.memstoreSize.addAndGet(size);
|
||||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
|
||||
if (flush) {
|
||||
// Request a cache flush. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
|
||||
return new Result(allKVs);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param row
|
||||
|
|
|
@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.MultiAction;
|
||||
import org.apache.hadoop.hbase.client.MultiPut;
|
||||
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
||||
|
@ -2293,6 +2294,26 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
return this.serverInfo;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Result increment(byte[] regionName, Increment increment)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
if (regionName == null) {
|
||||
throw new IOException("Invalid arguments to increment " +
|
||||
"regionName is null");
|
||||
}
|
||||
requestCount.incrementAndGet();
|
||||
try {
|
||||
HRegion region = getRegion(regionName);
|
||||
return region.increment(increment, getLockFromId(increment.getLockId()),
|
||||
increment.getWriteToWAL());
|
||||
} catch (IOException e) {
|
||||
checkFileSystem();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public long incrementColumnValue(byte[] regionName, byte[] row,
|
||||
byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
|
||||
|
|
|
@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.RuntimeMXBean;
|
||||
import java.rmi.UnexpectedException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -370,8 +371,6 @@ public class MemStore implements HeapSize {
|
|||
try {
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(
|
||||
row, family, qualifier);
|
||||
// create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
|
||||
KeyValue newKv;
|
||||
// Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
|
||||
SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
|
||||
if (!snSs.isEmpty()) {
|
||||
|
@ -410,48 +409,97 @@ public class MemStore implements HeapSize {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// add the new value now. this might have the same TS as an existing KV, thus confusing
|
||||
// readers slightly for a MOMENT until we erase the old one (and thus old value).
|
||||
newKv = new KeyValue(row, family, qualifier,
|
||||
now,
|
||||
Bytes.toBytes(newValue));
|
||||
long addedSize = add(newKv);
|
||||
|
||||
// remove extra versions.
|
||||
ss = kvset.tailSet(firstKv);
|
||||
it = ss.iterator();
|
||||
while ( it.hasNext() ) {
|
||||
KeyValue kv = it.next();
|
||||
|
||||
if (kv == newKv) {
|
||||
// ignore the one i just put in (heh)
|
||||
continue;
|
||||
}
|
||||
|
||||
// if this isnt the row we are interested in, then bail:
|
||||
if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv)) {
|
||||
break; // rows dont match, bail.
|
||||
}
|
||||
|
||||
// if the qualifier matches and it's a put, just RM it out of the kvset.
|
||||
if (firstKv.matchingQualifier(kv)) {
|
||||
// to be extra safe we only remove Puts that have a memstoreTS==0
|
||||
if (kv.getType() == KeyValue.Type.Put.getCode()) {
|
||||
// false means there was a change, so give us the size.
|
||||
addedSize -= heapSizeChange(kv, true);
|
||||
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return addedSize;
|
||||
// create or update (upsert) a new KeyValue with
|
||||
// 'now' and a 0 memstoreTS == immediately visible
|
||||
return upsert(Arrays.asList(new KeyValue [] {
|
||||
new KeyValue(row, family, qualifier, now,
|
||||
Bytes.toBytes(newValue))
|
||||
}));
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update or insert the specified KeyValues.
|
||||
* <p>
|
||||
* For each KeyValue, insert into MemStore. This will atomically upsert the
|
||||
* value for that row/family/qualifier. If a KeyValue did already exist,
|
||||
* it will then be removed.
|
||||
* <p>
|
||||
* Currently the memstoreTS is kept at 0 so as each insert happens, it will
|
||||
* be immediately visible. May want to change this so it is atomic across
|
||||
* all KeyValues.
|
||||
* <p>
|
||||
* This is called under row lock, so Get operations will still see updates
|
||||
* atomically. Scans will only see each KeyValue update as atomic.
|
||||
*
|
||||
* @param kvs
|
||||
* @return change in memstore size
|
||||
*/
|
||||
public long upsert(List<KeyValue> kvs) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
long size = 0;
|
||||
for (KeyValue kv : kvs) {
|
||||
kv.setMemstoreTS(0);
|
||||
size += upsert(kv);
|
||||
}
|
||||
return size;
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the specified KeyValue into MemStore and deletes any existing
|
||||
* versions of the same row/family/qualifier as the specified KeyValue.
|
||||
* <p>
|
||||
* First, the specified KeyValue is inserted into the Memstore.
|
||||
* <p>
|
||||
* If there are any existing KeyValues in this MemStore with the same row,
|
||||
* family, and qualifier, they are removed.
|
||||
* @param kv
|
||||
* @return change in size of MemStore
|
||||
*/
|
||||
private long upsert(KeyValue kv) {
|
||||
// Add the KeyValue to the MemStore
|
||||
long addedSize = add(kv);
|
||||
|
||||
// Iterate the KeyValues after the one just inserted, cleaning up any
|
||||
// other KeyValues with the same row/family/qualifier
|
||||
SortedSet<KeyValue> ss = kvset.tailSet(kv);
|
||||
Iterator<KeyValue> it = ss.iterator();
|
||||
while ( it.hasNext() ) {
|
||||
KeyValue cur = it.next();
|
||||
|
||||
if (kv == cur) {
|
||||
// ignore the one just put in
|
||||
continue;
|
||||
}
|
||||
// if this isn't the row we are interested in, then bail
|
||||
if (!kv.matchingRow(cur)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// if the qualifier matches and it's a put, remove it
|
||||
if (kv.matchingQualifier(cur)) {
|
||||
|
||||
// to be extra safe we only remove Puts that have a memstoreTS==0
|
||||
if (kv.getType() == KeyValue.Type.Put.getCode() &&
|
||||
kv.getMemstoreTS() == 0) {
|
||||
// false means there was a change, so give us the size.
|
||||
addedSize -= heapSizeChange(kv, true);
|
||||
it.remove();
|
||||
}
|
||||
} else {
|
||||
// past the column, done
|
||||
break;
|
||||
}
|
||||
}
|
||||
return addedSize;
|
||||
}
|
||||
|
||||
/*
|
||||
* Immutable data structure to hold member found in set and the set it was
|
||||
* found in. Include set because it is carrying context.
|
||||
|
|
|
@ -1386,6 +1386,30 @@ public class Store implements HeapSize {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds or replaces the specified KeyValues.
|
||||
* <p>
|
||||
* For each KeyValue specified, if a cell with the same row, family, and
|
||||
* qualifier exists in MemStore, it will be replaced. Otherwise, it will just
|
||||
* be inserted to MemStore.
|
||||
* <p>
|
||||
* This operation is atomic on each KeyValue (row/family/qualifier) but not
|
||||
* necessarily atomic across all of them.
|
||||
* @param kvs
|
||||
* @return memstore size delta
|
||||
* @throws IOException
|
||||
*/
|
||||
public long upsert(List<KeyValue> kvs)
|
||||
throws IOException {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
// TODO: Make this operation atomic w/ RWCC
|
||||
return this.memstore.upsert(kvs);
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public StoreFlusher getStoreFlusher(long cacheFlushId) {
|
||||
return new StoreFlusherImpl(cacheFlushId);
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -588,6 +589,9 @@ public class RemoteHTable implements HTableInterface {
|
|||
throw new IOException("checkAndDelete not supported");
|
||||
}
|
||||
|
||||
public Result increment(Increment increment) throws IOException {
|
||||
throw new IOException("Increment not supported");
|
||||
}
|
||||
|
||||
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount) throws IOException {
|
||||
|
|
|
@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
|
@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
|||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -261,7 +261,7 @@ public class TestFromClientSide {
|
|||
* logs to ensure that we're not scanning more regions that we're supposed to.
|
||||
* Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testFilterAcrossMultipleRegions()
|
||||
|
@ -1506,7 +1506,7 @@ public class TestFromClientSide {
|
|||
get.addFamily(FAMILIES[0]);
|
||||
get.setMaxVersions(Integer.MAX_VALUE);
|
||||
result = ht.get(get);
|
||||
assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
|
||||
assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
|
||||
new long [] {ts[1], ts[2], ts[3]},
|
||||
new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
|
||||
0, 2);
|
||||
|
@ -2633,6 +2633,23 @@ public class TestFromClientSide {
|
|||
equals(value, key.getValue()));
|
||||
}
|
||||
|
||||
private void assertIncrementKey(KeyValue key, byte [] row, byte [] family,
|
||||
byte [] qualifier, long value)
|
||||
throws Exception {
|
||||
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
|
||||
"Got row [" + Bytes.toString(key.getRow()) +"]",
|
||||
equals(row, key.getRow()));
|
||||
assertTrue("Expected family [" + Bytes.toString(family) + "] " +
|
||||
"Got family [" + Bytes.toString(key.getFamily()) + "]",
|
||||
equals(family, key.getFamily()));
|
||||
assertTrue("Expected qualifier [" + Bytes.toString(qualifier) + "] " +
|
||||
"Got qualifier [" + Bytes.toString(key.getQualifier()) + "]",
|
||||
equals(qualifier, key.getQualifier()));
|
||||
assertTrue("Expected value [" + value + "] " +
|
||||
"Got value [" + Bytes.toLong(key.getValue()) + "]",
|
||||
Bytes.toLong(key.getValue()) == value);
|
||||
}
|
||||
|
||||
private void assertNumKeys(Result result, int n) throws Exception {
|
||||
assertTrue("Expected " + n + " keys but got " + result.size(),
|
||||
result.size() == n);
|
||||
|
@ -3800,7 +3817,7 @@ public class TestFromClientSide {
|
|||
table.getConnection().clearRegionCache();
|
||||
assertEquals("Clearing cache should have 0 cached ", 0,
|
||||
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
|
||||
|
||||
|
||||
// A Get is suppose to do a region lookup request
|
||||
Get g = new Get(Bytes.toBytes("aaa"));
|
||||
table.get(g);
|
||||
|
@ -3848,5 +3865,76 @@ public class TestFromClientSide {
|
|||
|
||||
LOG.info("Finishing testRegionCachePreWarm");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() throws Exception {
|
||||
LOG.info("Starting testIncrement");
|
||||
final byte [] TABLENAME = Bytes.toBytes("testIncrement");
|
||||
HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||
|
||||
byte [][] ROWS = new byte [][] {
|
||||
Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
|
||||
Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
|
||||
Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
|
||||
};
|
||||
byte [][] QUALIFIERS = new byte [][] {
|
||||
Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
|
||||
Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
|
||||
Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
|
||||
};
|
||||
|
||||
// Do some simple single-column increments
|
||||
|
||||
// First with old API
|
||||
ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1);
|
||||
ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2);
|
||||
ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3);
|
||||
ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4);
|
||||
|
||||
// Now increment things incremented with old and do some new
|
||||
Increment inc = new Increment(ROW);
|
||||
inc.addColumn(FAMILY, QUALIFIERS[1], 1);
|
||||
inc.addColumn(FAMILY, QUALIFIERS[3], 1);
|
||||
inc.addColumn(FAMILY, QUALIFIERS[4], 1);
|
||||
ht.increment(inc);
|
||||
|
||||
// Verify expected results
|
||||
Result r = ht.get(new Get(ROW));
|
||||
KeyValue [] kvs = r.raw();
|
||||
assertEquals(5, kvs.length);
|
||||
assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1);
|
||||
assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3);
|
||||
assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3);
|
||||
assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5);
|
||||
assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1);
|
||||
|
||||
// Now try multiple columns by different amounts
|
||||
inc = new Increment(ROWS[0]);
|
||||
for (int i=0;i<QUALIFIERS.length;i++) {
|
||||
inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
|
||||
}
|
||||
ht.increment(inc);
|
||||
// Verify
|
||||
r = ht.get(new Get(ROWS[0]));
|
||||
kvs = r.raw();
|
||||
assertEquals(QUALIFIERS.length, kvs.length);
|
||||
for (int i=0;i<QUALIFIERS.length;i++) {
|
||||
assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1);
|
||||
}
|
||||
|
||||
// Re-increment them
|
||||
inc = new Increment(ROWS[0]);
|
||||
for (int i=0;i<QUALIFIERS.length;i++) {
|
||||
inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
|
||||
}
|
||||
ht.increment(inc);
|
||||
// Verify
|
||||
r = ht.get(new Get(ROWS[0]));
|
||||
kvs = r.raw();
|
||||
assertEquals(QUALIFIERS.length, kvs.length);
|
||||
for (int i=0;i<QUALIFIERS.length;i++) {
|
||||
assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue