HBASE-7900 Have client Mutations (Put/Delete/etc.) and Result implement CellScanner Interface

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1450017 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-02-26 04:33:36 +00:00
parent 34ebf732fc
commit 757ee26c3d
38 changed files with 625 additions and 557 deletions

View File

@ -29,33 +29,16 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class Action<R> implements Comparable<R> { public class Action<R> implements Comparable<R> {
// TODO: This class should not be visible outside of the client package.
private Row action; private Row action;
private int originalIndex; private int originalIndex;
private R result; private R result;
/**
* This constructor is replaced by {@link #Action(Row, int)}
*/
@Deprecated
public Action(byte[] regionName, Row action, int originalIndex) {
this(action, originalIndex);
}
public Action(Row action, int originalIndex) { public Action(Row action, int originalIndex) {
super(); super();
this.action = action; this.action = action;
this.originalIndex = originalIndex; this.originalIndex = originalIndex;
} }
@Deprecated
public byte[] getRegionName() {
return null;
}
@Deprecated
public void setRegionName(byte[] regionName) {
}
public R getResult() { public R getResult() {
return result; return result;
@ -73,6 +56,7 @@ public class Action<R> implements Comparable<R> {
return originalIndex; return originalIndex;
} }
@SuppressWarnings("rawtypes")
@Override @Override
public int compareTo(Object o) { public int compareTo(Object o) {
return action.compareTo(((Action) o).getAction()); return action.compareTo(((Action) o).getAction());
@ -85,4 +69,4 @@ public class Action<R> implements Comparable<R> {
Action<?> other = (Action<?>) obj; Action<?> other = (Action<?>) obj;
return compareTo(other) == 0; return compareTo(other) == 0;
} }
} }

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -78,13 +80,28 @@ public class Append extends Mutation {
* @return this * @return this
*/ */
public Append add(byte [] family, byte [] qualifier, byte [] value) { public Append add(byte [] family, byte [] qualifier, byte [] value) {
List<KeyValue> list = familyMap.get(family); KeyValue kv = new KeyValue(this.row, family, qualifier, this.ts, KeyValue.Type.Put, value);
if(list == null) { return add(kv);
list = new ArrayList<KeyValue>(); }
/**
* Add column and value to this Append operation.
* @param cell
* @return This instance
*/
@SuppressWarnings("unchecked")
public Append add(final Cell cell) {
// Presume it is KeyValue for now.
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
byte [] family = kv.getFamily();
List<? extends Cell> list = this.familyMap.get(family);
if (list == null) {
list = new ArrayList<Cell>();
} }
list.add(new KeyValue( // Cast so explicit list type rather than ? extends Cell. Help the compiler out. See
this.row, family, qualifier, this.ts, KeyValue.Type.Put, value)); // http://stackoverflow.com/questions/6474784/java-using-generics-with-lists-and-interfaces
familyMap.put(family, list); ((List<KeyValue>)list).add(kv);
this.familyMap.put(family, list);
return this; return this;
} }
} }

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -111,7 +112,9 @@ public class Delete extends Mutation implements Comparable<Row> {
* @return this for invocation chaining * @return this for invocation chaining
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("unchecked")
public Delete addDeleteMarker(KeyValue kv) throws IOException { public Delete addDeleteMarker(KeyValue kv) throws IOException {
// TODO: Deprecate and rename 'add' so it matches how we add KVs to Puts.
if (!kv.isDelete()) { if (!kv.isDelete()) {
throw new IOException("The recently added KeyValue is not of type " throw new IOException("The recently added KeyValue is not of type "
+ "delete. Rowkey: " + Bytes.toStringBinary(this.row)); + "delete. Rowkey: " + Bytes.toStringBinary(this.row));
@ -124,11 +127,13 @@ public class Delete extends Mutation implements Comparable<Row> {
+ Bytes.toStringBinary(this.row)); + Bytes.toStringBinary(this.row));
} }
byte [] family = kv.getFamily(); byte [] family = kv.getFamily();
List<KeyValue> list = familyMap.get(family); List<? extends Cell> list = familyMap.get(family);
if (list == null) { if (list == null) {
list = new ArrayList<KeyValue>(); list = new ArrayList<Cell>();
} }
list.add(kv); // Cast so explicit list type rather than ? extends Cell. Help the compiler out. See
// http://stackoverflow.com/questions/6474784/java-using-generics-with-lists-and-interfaces
((List<KeyValue>)list).add(kv);
familyMap.put(family, list); familyMap.put(family, list);
return this; return this;
} }
@ -156,14 +161,18 @@ public class Delete extends Mutation implements Comparable<Row> {
* @param timestamp maximum version timestamp * @param timestamp maximum version timestamp
* @return this for invocation chaining * @return this for invocation chaining
*/ */
@SuppressWarnings("unchecked")
public Delete deleteFamily(byte [] family, long timestamp) { public Delete deleteFamily(byte [] family, long timestamp) {
List<KeyValue> list = familyMap.get(family); List<? extends Cell> list = familyMap.get(family);
if(list == null) { if(list == null) {
list = new ArrayList<KeyValue>(); list = new ArrayList<Cell>();
} else if(!list.isEmpty()) { } else if(!list.isEmpty()) {
list.clear(); list.clear();
} }
list.add(new KeyValue(row, family, null, timestamp, KeyValue.Type.DeleteFamily)); KeyValue kv = new KeyValue(row, family, null, timestamp, KeyValue.Type.DeleteFamily);
// Cast so explicit list type rather than ? extends Cell. Help the compiler out. See
// http://stackoverflow.com/questions/6474784/java-using-generics-with-lists-and-interfaces
((List<KeyValue>)list).add(kv);
familyMap.put(family, list); familyMap.put(family, list);
return this; return this;
} }
@ -187,13 +196,16 @@ public class Delete extends Mutation implements Comparable<Row> {
* @param timestamp maximum version timestamp * @param timestamp maximum version timestamp
* @return this for invocation chaining * @return this for invocation chaining
*/ */
@SuppressWarnings("unchecked")
public Delete deleteColumns(byte [] family, byte [] qualifier, long timestamp) { public Delete deleteColumns(byte [] family, byte [] qualifier, long timestamp) {
List<KeyValue> list = familyMap.get(family); List<? extends Cell> list = familyMap.get(family);
if (list == null) { if (list == null) {
list = new ArrayList<KeyValue>(); list = new ArrayList<Cell>();
} }
list.add(new KeyValue(this.row, family, qualifier, timestamp, // Cast so explicit list type rather than ? extends Cell. Help the compiler out. See
KeyValue.Type.DeleteColumn)); // http://stackoverflow.com/questions/6474784/java-using-generics-with-lists-and-interfaces
((List<KeyValue>)list).add(new KeyValue(this.row, family, qualifier, timestamp,
KeyValue.Type.DeleteColumn));
familyMap.put(family, list); familyMap.put(family, list);
return this; return this;
} }
@ -219,13 +231,16 @@ public class Delete extends Mutation implements Comparable<Row> {
* @param timestamp version timestamp * @param timestamp version timestamp
* @return this for invocation chaining * @return this for invocation chaining
*/ */
@SuppressWarnings("unchecked")
public Delete deleteColumn(byte [] family, byte [] qualifier, long timestamp) { public Delete deleteColumn(byte [] family, byte [] qualifier, long timestamp) {
List<KeyValue> list = familyMap.get(family); List<? extends Cell> list = familyMap.get(family);
if(list == null) { if(list == null) {
list = new ArrayList<KeyValue>(); list = new ArrayList<Cell>();
} }
list.add(new KeyValue( // Cast so explicit list type rather than ? extends Cell. Help the compiler out. See
this.row, family, qualifier, timestamp, KeyValue.Type.Delete)); // http://stackoverflow.com/questions/6474784/java-using-generics-with-lists-and-interfaces
KeyValue kv = new KeyValue(this.row, family, qualifier, timestamp, KeyValue.Type.Delete);
((List<KeyValue>)list).add(kv);
familyMap.put(family, list); familyMap.put(family, list);
return this; return this;
} }

View File

@ -41,8 +41,7 @@ import java.util.TreeSet;
* Used to perform Get operations on a single row. * Used to perform Get operations on a single row.
* <p> * <p>
* To get everything for a row, instantiate a Get object with the row to get. * To get everything for a row, instantiate a Get object with the row to get.
* To further define the scope of what to get, perform additional methods as * To further narrow the scope of what to Get, use the methods below.
* outlined below.
* <p> * <p>
* To get all columns from specific families, execute {@link #addFamily(byte[]) addFamily} * To get all columns from specific families, execute {@link #addFamily(byte[]) addFamily}
* for each family to retrieve. * for each family to retrieve.
@ -59,7 +58,7 @@ import java.util.TreeSet;
* To limit the number of versions of each column to be returned, execute * To limit the number of versions of each column to be returned, execute
* {@link #setMaxVersions(int) setMaxVersions}. * {@link #setMaxVersions(int) setMaxVersions}.
* <p> * <p>
* To add a filter, execute {@link #setFilter(Filter) setFilter}. * To add a filter, call {@link #setFilter(Filter) setFilter}.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.Cell;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -1098,8 +1100,10 @@ public class HTable implements HTableInterface {
throw new IllegalArgumentException("No columns to insert"); throw new IllegalArgumentException("No columns to insert");
} }
if (maxKeyValueSize > 0) { if (maxKeyValueSize > 0) {
for (List<KeyValue> list : put.getFamilyMap().values()) { for (List<? extends Cell> list : put.getFamilyMap().values()) {
for (KeyValue kv : list) { for (Cell cell : list) {
// KeyValue v1 expectation. Cast for now.
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (kv.getLength() > maxKeyValueSize) { if (kv.getLength() > maxKeyValueSize) {
throw new IllegalArgumentException("KeyValue size too large"); throw new IllegalArgumentException("KeyValue size too large");
} }

View File

@ -18,16 +18,19 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
/** /**
* Used to perform Increment operations on a single row. * Used to perform Increment operations on a single row.
@ -43,15 +46,8 @@ import java.util.TreeMap;
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class Increment implements Row { public class Increment extends Mutation implements Comparable<Row> {
private byte [] row = null;
private boolean writeToWAL = true;
private TimeRange tr = new TimeRange(); private TimeRange tr = new TimeRange();
private Map<byte [], NavigableMap<byte [], Long>> familyMap =
new TreeMap<byte [], NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
/** Constructor for Writable. DO NOT USE */
public Increment() {}
/** /**
* Create a Increment operation for the specified row, using an existing row * Create a Increment operation for the specified row, using an existing row
@ -61,10 +57,10 @@ public class Increment implements Row {
* @param row row key * @param row row key
*/ */
public Increment(byte [] row) { public Increment(byte [] row) {
if (row == null) { if (row == null || row.length > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException("Cannot increment a null row"); throw new IllegalArgumentException("Row key is invalid");
} }
this.row = row; this.row = Arrays.copyOf(row, row.length);
} }
/** /**
@ -77,6 +73,7 @@ public class Increment implements Row {
* @param amount amount to increment by * @param amount amount to increment by
* @return the Increment object * @return the Increment object
*/ */
@SuppressWarnings("unchecked")
public Increment addColumn(byte [] family, byte [] qualifier, long amount) { public Increment addColumn(byte [] family, byte [] qualifier, long amount) {
if (family == null) { if (family == null) {
throw new IllegalArgumentException("family cannot be null"); throw new IllegalArgumentException("family cannot be null");
@ -84,40 +81,10 @@ public class Increment implements Row {
if (qualifier == null) { if (qualifier == null) {
throw new IllegalArgumentException("qualifier cannot be null"); throw new IllegalArgumentException("qualifier cannot be null");
} }
NavigableMap<byte [], Long> set = familyMap.get(family); List<? extends Cell> list = getCellList(family);
if(set == null) { KeyValue kv = createPutKeyValue(family, qualifier, ts, Bytes.toBytes(amount));
set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR); ((List<KeyValue>)list).add(kv);
} familyMap.put(kv.getFamily(), list);
set.put(qualifier, amount);
familyMap.put(family, set);
return this;
}
/* Accessors */
/**
* Method for retrieving the increment's row
* @return row
*/
public byte [] getRow() {
return this.row;
}
/**
* Method for retrieving whether WAL will be written to or not
* @return true if WAL should be used, false if not
*/
public boolean getWriteToWAL() {
return this.writeToWAL;
}
/**
* Sets whether this operation should write to the WAL or not.
* @param writeToWAL true if WAL should be used, false if not
* @return this increment operation
*/
public Increment setWriteToWAL(boolean writeToWAL) {
this.writeToWAL = writeToWAL;
return this; return this;
} }
@ -149,14 +116,6 @@ public class Increment implements Row {
return this; return this;
} }
/**
* Method for retrieving the keys in the familyMap
* @return keys in the current familyMap
*/
public Set<byte[]> familySet() {
return this.familyMap.keySet();
}
/** /**
* Method for retrieving the number of families to increment from * Method for retrieving the number of families to increment from
* @return number of families * @return number of families
@ -165,19 +124,6 @@ public class Increment implements Row {
return this.familyMap.size(); return this.familyMap.size();
} }
/**
* Method for retrieving the number of columns to increment
* @return number of columns across all families
*/
public int numColumns() {
if (!hasFamilies()) return 0;
int num = 0;
for (NavigableMap<byte [], Long> family : familyMap.values()) {
num += family.size();
}
return num;
}
/** /**
* Method for checking if any families have been inserted into this Increment * Method for checking if any families have been inserted into this Increment
* @return true if familyMap is non empty false otherwise * @return true if familyMap is non empty false otherwise
@ -186,14 +132,6 @@ public class Increment implements Row {
return !this.familyMap.isEmpty(); return !this.familyMap.isEmpty();
} }
/**
* Method for retrieving the increment's familyMap
* @return familyMap
*/
public Map<byte[],NavigableMap<byte[], Long>> getFamilyMap() {
return this.familyMap;
}
/** /**
* @return String * @return String
*/ */
@ -208,8 +146,7 @@ public class Increment implements Row {
} }
sb.append(", families="); sb.append(", families=");
boolean moreThanOne = false; boolean moreThanOne = false;
for(Map.Entry<byte [], NavigableMap<byte[], Long>> entry : for(Map.Entry<byte [], List<? extends Cell>> entry: this.familyMap.entrySet()) {
this.familyMap.entrySet()) {
if(moreThanOne) { if(moreThanOne) {
sb.append("), "); sb.append("), ");
} else { } else {
@ -224,13 +161,14 @@ public class Increment implements Row {
} else { } else {
sb.append("{"); sb.append("{");
boolean moreThanOneB = false; boolean moreThanOneB = false;
for(Map.Entry<byte [], Long> column : entry.getValue().entrySet()) { for(Cell cell : entry.getValue()) {
if(moreThanOneB) { if(moreThanOneB) {
sb.append(", "); sb.append(", ");
} else { } else {
moreThanOneB = true; moreThanOneB = true;
} }
sb.append(Bytes.toStringBinary(column.getKey()) + "+=" + column.getValue()); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
sb.append(Bytes.toStringBinary(kv.getKey()) + "+=" + Bytes.toLong(kv.getValue()));
} }
sb.append("}"); sb.append("}");
} }
@ -255,4 +193,4 @@ public class Increment implements Row {
Row other = (Row) obj; Row other = (Row) obj;
return compareTo(other) == 0; return compareTo(other) == 0;
} }
} }

View File

@ -35,9 +35,11 @@ import java.util.TreeMap;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public final class MultiAction<R> { public final class MultiAction<R> {
// TODO: This class should not be visible outside of the client package.
// map of regions to lists of puts/gets/deletes for that region. // map of regions to lists of puts/gets/deletes for that region.
public Map<byte[], List<Action<R>>> actions = new TreeMap<byte[], List<Action<R>>>(Bytes.BYTES_COMPARATOR); public Map<byte[], List<Action<R>>> actions =
new TreeMap<byte[], List<Action<R>>>(Bytes.BYTES_COMPARATOR);
public MultiAction() { public MultiAction() {
super(); super();
@ -87,4 +89,4 @@ public final class MultiAction<R> {
} }
return res; return res;
} }
} }

View File

@ -23,26 +23,78 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hbase.Cell;
import org.apache.hbase.CellScannable;
import org.apache.hbase.CellScanner;
import org.apache.hbase.CellUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class Mutation extends OperationWithAttributes implements Row { public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable {
static final long MUTATION_OVERHEAD = ClassSize.align(
// This
ClassSize.OBJECT +
// OperationWithAttributes map reference? I don't know what the other reference is and if I
// remove it it breaks TestHeapSize so just leaving it.
2 * ClassSize.REFERENCE +
// Timestamp
1 * Bytes.SIZEOF_LONG +
// writeToWAL
Bytes.SIZEOF_BOOLEAN +
// familyMap
ClassSize.REFERENCE +
// familyMap
ClassSize.TREEMAP);
// Attribute used in Mutations to indicate the originating cluster. // Attribute used in Mutations to indicate the originating cluster.
private static final String CLUSTER_ID_ATTR = "_c.id_"; private static final String CLUSTER_ID_ATTR = "_c.id_";
protected byte [] row = null; protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP; protected long ts = HConstants.LATEST_TIMESTAMP;
protected boolean writeToWAL = true; protected boolean writeToWAL = true;
protected Map<byte [], List<KeyValue>> familyMap = // A Map sorted by column family.
new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR); protected NavigableMap<byte [], List<? extends Cell>> familyMap =
new TreeMap<byte [], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
@Override
public CellScanner cellScanner() {
return CellUtil.createCellScanner(getFamilyMap());
}
/**
* Creates an empty list if one doesn't exist for the given column family
* or else it returns the associated list of Cell objects.
*
* @param family column family
* @return a list of Cell objects, returns an empty list if one doesn't exist.
*/
List<? extends Cell> getCellList(byte[] family) {
List<? extends Cell> list = this.familyMap.get(family);
if (list == null) {
list = new ArrayList<Cell>();
}
return list;
}
/*
* Create a nnnnnnnn with this objects row key and the Put identifier.
*
* @return a KeyValue with this objects row key and the Put identifier.
*/
KeyValue createPutKeyValue(byte[] family, byte[] qualifier, long ts, byte[] value) {
return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put, value);
}
/** /**
* Compile the column family (i.e. schema) information * Compile the column family (i.e. schema) information
@ -57,9 +109,9 @@ public abstract class Mutation extends OperationWithAttributes implements Row {
// ideally, we would also include table information, but that information // ideally, we would also include table information, but that information
// is not stored in each Operation instance. // is not stored in each Operation instance.
map.put("families", families); map.put("families", families);
for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) { for (Map.Entry<byte [], List<? extends Cell>> entry : this.familyMap.entrySet()) {
families.add(Bytes.toStringBinary(entry.getKey())); families.add(Bytes.toStringBinary(entry.getKey()));
} }
return map; return map;
} }
@ -74,7 +126,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row {
public Map<String, Object> toMap(int maxCols) { public Map<String, Object> toMap(int maxCols) {
// we start with the fingerprint map and build on top of it. // we start with the fingerprint map and build on top of it.
Map<String, Object> map = getFingerprint(); Map<String, Object> map = getFingerprint();
// replace the fingerprint's simple list of families with a // replace the fingerprint's simple list of families with a
// map from column families to lists of qualifiers and kv details // map from column families to lists of qualifiers and kv details
Map<String, List<Map<String, Object>>> columns = Map<String, List<Map<String, Object>>> columns =
new HashMap<String, List<Map<String, Object>>>(); new HashMap<String, List<Map<String, Object>>>();
@ -82,20 +134,21 @@ public abstract class Mutation extends OperationWithAttributes implements Row {
map.put("row", Bytes.toStringBinary(this.row)); map.put("row", Bytes.toStringBinary(this.row));
int colCount = 0; int colCount = 0;
// iterate through all column families affected // iterate through all column families affected
for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) { for (Map.Entry<byte [], List<? extends Cell>> entry : this.familyMap.entrySet()) {
// map from this family to details for each kv affected within the family // map from this family to details for each cell affected within the family
List<Map<String, Object>> qualifierDetails = List<Map<String, Object>> qualifierDetails = new ArrayList<Map<String, Object>>();
new ArrayList<Map<String, Object>>();
columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails); columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
colCount += entry.getValue().size(); colCount += entry.getValue().size();
if (maxCols <= 0) { if (maxCols <= 0) {
continue; continue;
} }
// add details for each kv // add details for each cell
for (KeyValue kv : entry.getValue()) { for (Cell cell: entry.getValue()) {
if (--maxCols <= 0 ) { if (--maxCols <= 0 ) {
continue; continue;
} }
// KeyValue v1 expectation. Cast for now until we go all Cell all the time.
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
Map<String, Object> kvMap = kv.toStringMap(); Map<String, Object> kvMap = kv.toStringMap();
// row and family information are already available in the bigger map // row and family information are already available in the bigger map
kvMap.remove("row"); kvMap.remove("row");
@ -131,14 +184,16 @@ public abstract class Mutation extends OperationWithAttributes implements Row {
* Method for retrieving the put's familyMap * Method for retrieving the put's familyMap
* @return familyMap * @return familyMap
*/ */
public Map<byte [], List<KeyValue>> getFamilyMap() { public NavigableMap<byte [], List<? extends Cell>> getFamilyMap() {
return this.familyMap; return this.familyMap;
} }
/** /**
* Method for setting the put's familyMap * Method for setting the put's familyMap
*/ */
public void setFamilyMap(Map<byte [], List<KeyValue>> map) { public void setFamilyMap(NavigableMap<byte [], List<? extends Cell>> map) {
// TODO: Shut this down or move it up to be a Constructor. Get new object rather than change
// this internal data member.
this.familyMap = map; this.familyMap = map;
} }
@ -199,8 +254,8 @@ public abstract class Mutation extends OperationWithAttributes implements Row {
*/ */
public int size() { public int size() {
int size = 0; int size = 0;
for(List<KeyValue> kvList : this.familyMap.values()) { for (List<? extends Cell> cells : this.familyMap.values()) {
size += kvList.size(); size += cells.size();
} }
return size; return size;
} }
@ -211,4 +266,37 @@ public abstract class Mutation extends OperationWithAttributes implements Row {
public int numFamilies() { public int numFamilies() {
return familyMap.size(); return familyMap.size();
} }
/**
* @return Calculate what Mutation adds to class heap size.
*/
long heapSize() {
long heapsize = MUTATION_OVERHEAD;
// Adding row
heapsize += ClassSize.align(ClassSize.ARRAY + this.row.length);
// Adding map overhead
heapsize +=
ClassSize.align(this.familyMap.size() * ClassSize.MAP_ENTRY);
for(Map.Entry<byte [], List<? extends Cell>> entry : this.familyMap.entrySet()) {
//Adding key overhead
heapsize +=
ClassSize.align(ClassSize.ARRAY + entry.getKey().length);
//This part is kinds tricky since the JVM can reuse references if you
//store the same value, but have a good match with SizeOf at the moment
//Adding value overhead
heapsize += ClassSize.align(ClassSize.ARRAYLIST);
int size = entry.getValue().size();
heapsize += ClassSize.align(ClassSize.ARRAY +
size * ClassSize.REFERENCE);
for(Cell cell : entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
heapsize += kv.heapSize();
}
}
heapsize += getAttributeSize();
return heapsize;
}
} }

View File

@ -34,6 +34,7 @@ import java.util.Map;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class Operation { public abstract class Operation {
// TODO make this configurable // TODO make this configurable
// TODO Do we need this anymore now we have protobuffed it all?
private static final int DEFAULT_MAX_COLS = 5; private static final int DEFAULT_MAX_COLS = 5;
/** /**
@ -109,5 +110,4 @@ public abstract class Operation {
public String toString() { public String toString() {
return toString(DEFAULT_MAX_COLS); return toString(DEFAULT_MAX_COLS);
} }
} }

View File

@ -31,7 +31,7 @@ import java.util.Map;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class OperationWithAttributes extends Operation implements Attributes { public abstract class OperationWithAttributes extends Operation implements Attributes {
// a opaque blob of attributes // An opaque blob of attributes
private Map<String, byte[]> attributes; private Map<String, byte[]> attributes;
// used for uniquely identifying an operation // used for uniquely identifying an operation

View File

@ -19,14 +19,6 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -34,6 +26,16 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hbase.Cell;
/** /**
* Used to perform Put operations for a single row. * Used to perform Put operations for a single row.
* <p> * <p>
@ -44,11 +46,6 @@ import java.util.TreeMap;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class Put extends Mutation implements HeapSize, Comparable<Row> { public class Put extends Mutation implements HeapSize, Comparable<Row> {
private static final long OVERHEAD = ClassSize.align(
ClassSize.OBJECT + 2 * ClassSize.REFERENCE +
1 * Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN +
ClassSize.REFERENCE + ClassSize.TREEMAP);
/** /**
* Create a Put operation for the specified row. * Create a Put operation for the specified row.
* @param row row key * @param row row key
@ -77,10 +74,8 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
*/ */
public Put(Put putToCopy) { public Put(Put putToCopy) {
this(putToCopy.getRow(), putToCopy.ts); this(putToCopy.getRow(), putToCopy.ts);
this.familyMap = this.familyMap = new TreeMap<byte [], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR); for(Map.Entry<byte [], List<? extends Cell>> entry: putToCopy.getFamilyMap().entrySet()) {
for(Map.Entry<byte [], List<KeyValue>> entry :
putToCopy.getFamilyMap().entrySet()) {
this.familyMap.put(entry.getKey(), entry.getValue()); this.familyMap.put(entry.getKey(), entry.getValue());
} }
this.writeToWAL = putToCopy.writeToWAL; this.writeToWAL = putToCopy.writeToWAL;
@ -106,10 +101,11 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
* @param value column value * @param value column value
* @return this * @return this
*/ */
@SuppressWarnings("unchecked")
public Put add(byte [] family, byte [] qualifier, long ts, byte [] value) { public Put add(byte [] family, byte [] qualifier, long ts, byte [] value) {
List<KeyValue> list = getKeyValueList(family); List<? extends Cell> list = getCellList(family);
KeyValue kv = createPutKeyValue(family, qualifier, ts, value); KeyValue kv = createPutKeyValue(family, qualifier, ts, value);
list.add(kv); ((List<KeyValue>)list).add(kv);
familyMap.put(kv.getFamily(), list); familyMap.put(kv.getFamily(), list);
return this; return this;
} }
@ -122,9 +118,10 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
* @return this * @return this
* @throws java.io.IOException e * @throws java.io.IOException e
*/ */
@SuppressWarnings("unchecked")
public Put add(KeyValue kv) throws IOException{ public Put add(KeyValue kv) throws IOException{
byte [] family = kv.getFamily(); byte [] family = kv.getFamily();
List<KeyValue> list = getKeyValueList(family); List<? extends Cell> list = getCellList(family);
//Checking that the row of the kv is the same as the put //Checking that the row of the kv is the same as the put
int res = Bytes.compareTo(this.row, 0, row.length, int res = Bytes.compareTo(this.row, 0, row.length,
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()); kv.getBuffer(), kv.getRowOffset(), kv.getRowLength());
@ -134,22 +131,11 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
kv.getRowLength()) + " doesn't match the original one " + kv.getRowLength()) + " doesn't match the original one " +
Bytes.toStringBinary(this.row)); Bytes.toStringBinary(this.row));
} }
list.add(kv); ((List<KeyValue>)list).add(kv);
familyMap.put(family, list); familyMap.put(family, list);
return this; return this;
} }
/*
* Create a KeyValue with this objects row key and the Put identifier.
*
* @return a KeyValue with this objects row key and the Put identifier.
*/
private KeyValue createPutKeyValue(byte[] family, byte[] qualifier, long ts,
byte[] value) {
return new KeyValue(this.row, family, qualifier, ts, KeyValue.Type.Put,
value);
}
/** /**
* A convenience method to determine if this object's familyMap contains * A convenience method to determine if this object's familyMap contains
* a value assigned to the given family & qualifier. * a value assigned to the given family & qualifier.
@ -226,7 +212,7 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
*/ */
private boolean has(byte[] family, byte[] qualifier, long ts, byte[] value, private boolean has(byte[] family, byte[] qualifier, long ts, byte[] value,
boolean ignoreTS, boolean ignoreValue) { boolean ignoreTS, boolean ignoreValue) {
List<KeyValue> list = getKeyValueList(family); List<? extends Cell> list = getCellList(family);
if (list.size() == 0) { if (list.size() == 0) {
return false; return false;
} }
@ -236,7 +222,8 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
// F T => 2 // F T => 2
// F F => 1 // F F => 1
if (!ignoreTS && !ignoreValue) { if (!ignoreTS && !ignoreValue) {
for (KeyValue kv : list) { for (Cell cell : list) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (Arrays.equals(kv.getFamily(), family) && if (Arrays.equals(kv.getFamily(), family) &&
Arrays.equals(kv.getQualifier(), qualifier) && Arrays.equals(kv.getQualifier(), qualifier) &&
Arrays.equals(kv.getValue(), value) && Arrays.equals(kv.getValue(), value) &&
@ -245,21 +232,24 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
} }
} }
} else if (ignoreValue && !ignoreTS) { } else if (ignoreValue && !ignoreTS) {
for (KeyValue kv : list) { for (Cell cell : list) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (Arrays.equals(kv.getFamily(), family) && Arrays.equals(kv.getQualifier(), qualifier) if (Arrays.equals(kv.getFamily(), family) && Arrays.equals(kv.getQualifier(), qualifier)
&& kv.getTimestamp() == ts) { && kv.getTimestamp() == ts) {
return true; return true;
} }
} }
} else if (!ignoreValue && ignoreTS) { } else if (!ignoreValue && ignoreTS) {
for (KeyValue kv : list) { for (Cell cell : list) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (Arrays.equals(kv.getFamily(), family) && Arrays.equals(kv.getQualifier(), qualifier) if (Arrays.equals(kv.getFamily(), family) && Arrays.equals(kv.getQualifier(), qualifier)
&& Arrays.equals(kv.getValue(), value)) { && Arrays.equals(kv.getValue(), value)) {
return true; return true;
} }
} }
} else { } else {
for (KeyValue kv : list) { for (Cell cell : list) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (Arrays.equals(kv.getFamily(), family) && if (Arrays.equals(kv.getFamily(), family) &&
Arrays.equals(kv.getQualifier(), qualifier)) { Arrays.equals(kv.getQualifier(), qualifier)) {
return true; return true;
@ -279,7 +269,8 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
*/ */
public List<KeyValue> get(byte[] family, byte[] qualifier) { public List<KeyValue> get(byte[] family, byte[] qualifier) {
List<KeyValue> filteredList = new ArrayList<KeyValue>(); List<KeyValue> filteredList = new ArrayList<KeyValue>();
for (KeyValue kv: getKeyValueList(family)) { for (Cell cell: getCellList(family)) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (Arrays.equals(kv.getQualifier(), qualifier)) { if (Arrays.equals(kv.getQualifier(), qualifier)) {
filteredList.add(kv); filteredList.add(kv);
} }
@ -287,49 +278,8 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
return filteredList; return filteredList;
} }
/** @Override
* Creates an empty list if one doesnt exist for the given column family
* or else it returns the associated list of KeyValue objects.
*
* @param family column family
* @return a list of KeyValue objects, returns an empty list if one doesnt exist.
*/
private List<KeyValue> getKeyValueList(byte[] family) {
List<KeyValue> list = familyMap.get(family);
if(list == null) {
list = new ArrayList<KeyValue>(0);
}
return list;
}
//HeapSize
public long heapSize() { public long heapSize() {
long heapsize = OVERHEAD; return ClassSize.align((int)super.heapSize());
//Adding row
heapsize += ClassSize.align(ClassSize.ARRAY + this.row.length);
//Adding map overhead
heapsize +=
ClassSize.align(this.familyMap.size() * ClassSize.MAP_ENTRY);
for(Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
//Adding key overhead
heapsize +=
ClassSize.align(ClassSize.ARRAY + entry.getKey().length);
//This part is kinds tricky since the JVM can reuse references if you
//store the same value, but have a good match with SizeOf at the moment
//Adding value overhead
heapsize += ClassSize.align(ClassSize.ARRAYLIST);
int size = entry.getValue().size();
heapsize += ClassSize.align(ClassSize.ARRAY +
size * ClassSize.REFERENCE);
for(KeyValue kv : entry.getValue()) {
heapsize += kv.heapSize();
}
}
heapsize += getAttributeSize();
return ClassSize.align((int)heapsize);
} }
} }

View File

@ -24,6 +24,10 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
import org.apache.hbase.CellScannable;
import org.apache.hbase.CellScanner;
import org.apache.hbase.CellUtil;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -38,7 +42,7 @@ import java.util.TreeMap;
/** /**
* Single row result of a {@link Get} or {@link Scan} query.<p> * Single row result of a {@link Get} or {@link Scan} query.<p>
* *
* This class is NOT THREAD SAFE.<p> * This class is <b>NOT THREAD SAFE</b>.<p>
* *
* Convenience methods are available that return various {@link Map} * Convenience methods are available that return various {@link Map}
* structures and values directly.<p> * structures and values directly.<p>
@ -61,13 +65,13 @@ import java.util.TreeMap;
* Each KeyValue can then be accessed through * Each KeyValue can then be accessed through
* {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()}, * {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()},
* {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.<p> * {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.<p>
* *
* If you need to overwrite a Result with another Result instance -- as in the old 'mapred' RecordReader next * If you need to overwrite a Result with another Result instance -- as in the old 'mapred' RecordReader next
* invocations -- then create an empty Result with the null constructor and in then use {@link #copyFrom(Result)} * invocations -- then create an empty Result with the null constructor and in then use {@link #copyFrom(Result)}
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class Result { public class Result implements CellScannable {
private KeyValue [] kvs; private KeyValue [] kvs;
// We're not using java serialization. Transient here is just a marker to say // We're not using java serialization. Transient here is just a marker to say
// that this is where we cache row if we're ever asked for it. // that this is where we cache row if we're ever asked for it.
@ -78,6 +82,7 @@ public class Result {
// never use directly // never use directly
private static byte [] buffer = null; private static byte [] buffer = null;
private static final int PAD_WIDTH = 128; private static final int PAD_WIDTH = 128;
public static final Result EMPTY_RESULT = new Result();
/** /**
* Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #raw()}. * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #raw()}.
@ -105,7 +110,8 @@ public class Result {
* are already sorted * are already sorted
* @param kvs List of KeyValues * @param kvs List of KeyValues
*/ */
public Result(List<KeyValue> kvs) { public Result(List<? extends Cell> kvs) {
// TODO: Here we presume the passed in Cells are KVs. One day this won't always be so.
this(kvs.toArray(new KeyValue[kvs.size()])); this(kvs.toArray(new KeyValue[kvs.size()]));
} }
@ -678,7 +684,7 @@ public class Result {
*/ */
public static void compareResults(Result res1, Result res2) public static void compareResults(Result res1, Result res2)
throws Exception { throws Exception {
if (res2 == null) { if (res2 == null) {
throw new Exception("There wasn't enough rows, we stopped at " throw new Exception("There wasn't enough rows, we stopped at "
+ Bytes.toStringBinary(res1.getRow())); + Bytes.toStringBinary(res1.getRow()));
} }
@ -706,4 +712,9 @@ public class Result {
this.familyMap = null; this.familyMap = null;
this.kvs = other.kvs; this.kvs = other.kvs;
} }
}
@Override
public CellScanner cellScanner() {
return CellUtil.createCellScanner(this.kvs);
}
}

View File

@ -31,4 +31,4 @@ public interface Row extends Comparable<Row> {
* @return The row. * @return The row.
*/ */
public byte [] getRow(); public byte [] getRow();
} }

View File

@ -38,9 +38,8 @@ import java.util.List;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RowMutations implements Row { public class RowMutations implements Row {
private List<Mutation> mutations = new ArrayList<Mutation>(); private final List<Mutation> mutations = new ArrayList<Mutation>();
private byte [] row; private byte [] row;
private static final byte VERSION = (byte)0;
/** Constructor for Writable. DO NOT USE */ /** Constructor for Writable. DO NOT USE */
public RowMutations() {} public RowMutations() {}
@ -100,4 +99,4 @@ public class RowMutations implements Row {
public List<Mutation> getMutations() { public List<Mutation> getMutations() {
return Collections.unmodifiableList(mutations); return Collections.unmodifiableList(mutations);
} }
} }

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -54,7 +56,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
= "hbase.client.log.scanner.latency.cutoff"; = "hbase.client.log.scanner.latency.cutoff";
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
private static final Log LOG = LogFactory.getLog(ScannerCallable.class); public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
private long scannerId = -1L; private long scannerId = -1L;
private boolean instantiated = false; private boolean instantiated = false;
private boolean closed = false; private boolean closed = false;
@ -135,9 +137,10 @@ public class ScannerCallable extends ServerCallable<Result[]> {
this.scannerId = openScanner(); this.scannerId = openScanner();
} else { } else {
Result [] rrs = null; Result [] rrs = null;
ScanRequest request = null;
try { try {
incRPCcallsMetrics(); incRPCcallsMetrics();
ScanRequest request = request =
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null; ScanResponse response = null;
try { try {
@ -174,8 +177,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
updateResultsMetrics(response); updateResultsMetrics(response);
} catch (IOException e) { } catch (IOException e) {
if (logScannerActivity) { if (logScannerActivity) {
LOG.info("Got exception in fetching from scanner=" LOG.info("Got exception making request " + TextFormat.shortDebugString(request), e);
+ scannerId, e);
} }
IOException ioe = e; IOException ioe = e;
if (e instanceof RemoteException) { if (e instanceof RemoteException) {

View File

@ -245,4 +245,4 @@ public abstract class ServerCallable<T> implements Callable<T> {
} }
return t; return t;
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MasterAdminProtocol; import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Action;
@ -118,7 +119,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet; import java.util.NavigableSet;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME; import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
@ -747,16 +747,15 @@ public final class ProtobufUtil {
} }
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],NavigableMap<byte[], Long>> for (Map.Entry<byte[], List<? extends Cell>> family: increment.getFamilyMap().entrySet()) {
family: increment.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey())); columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifierValue(); columnBuilder.clearQualifierValue();
NavigableMap<byte[], Long> values = family.getValue(); List<? extends Cell> values = family.getValue();
if (values != null && values.size() > 0) { if (values != null && values.size() > 0) {
for (Map.Entry<byte[], Long> value: values.entrySet()) { for (Cell cell: values) {
valueBuilder.setQualifier(ByteString.copyFrom(value.getKey())); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
valueBuilder.setValue(ByteString.copyFrom( valueBuilder.setQualifier(ByteString.copyFrom(kv.getQualifier()));
Bytes.toBytes(value.getValue().longValue()))); valueBuilder.setValue(ByteString.copyFrom(kv.getValue()));
columnBuilder.addQualifierValue(valueBuilder.build()); columnBuilder.addQualifierValue(valueBuilder.build());
} }
} }
@ -791,16 +790,16 @@ public final class ProtobufUtil {
} }
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<KeyValue>> for (Map.Entry<byte[],List<? extends Cell>> family: mutation.getFamilyMap().entrySet()) {
family: mutation.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey())); columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifierValue(); columnBuilder.clearQualifierValue();
for (KeyValue value: family.getValue()) { for (Cell cell: family.getValue()) {
valueBuilder.setQualifier(ByteString.copyFrom(value.getQualifier())); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
valueBuilder.setValue(ByteString.copyFrom(value.getValue())); valueBuilder.setQualifier(ByteString.copyFrom(kv.getQualifier()));
valueBuilder.setTimestamp(value.getTimestamp()); valueBuilder.setValue(ByteString.copyFrom(kv.getValue()));
valueBuilder.setTimestamp(kv.getTimestamp());
if (mutateType == MutateType.DELETE) { if (mutateType == MutateType.DELETE) {
KeyValue.Type keyValueType = KeyValue.Type.codeToType(value.getType()); KeyValue.Type keyValueType = KeyValue.Type.codeToType(kv.getType());
valueBuilder.setDeleteType(toDeleteType(keyValueType)); valueBuilder.setDeleteType(toDeleteType(keyValueType));
} }
columnBuilder.addQualifierValue(valueBuilder.build()); columnBuilder.addQualifierValue(valueBuilder.build());
@ -1765,8 +1764,8 @@ public final class ProtobufUtil {
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T extends Message> public static <T extends Message>
T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b) T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b)
throws IOException { throws IOException {
Type type = runtimeClass.getGenericSuperclass(); Type type = runtimeClass.getGenericSuperclass();
Type argType = ((ParameterizedType)type).getActualTypeArguments()[position]; Type argType = ((ParameterizedType)type).getActualTypeArguments()[position];

View File

@ -201,7 +201,8 @@ public class KeyValueUtil {
* @return <code>cell<code> if it is an instance of {@link KeyValue} else we will return a * @return <code>cell<code> if it is an instance of {@link KeyValue} else we will return a
* new {@link KeyValue} instance made from <code>cell</code> * new {@link KeyValue} instance made from <code>cell</code>
*/ */
public static Cell ensureKeyValue(final Cell cell) { public static KeyValue ensureKeyValue(final Cell cell) {
return cell instanceof KeyValue? cell: copyToNewKeyValue(cell); if (cell == null) return null;
return cell instanceof KeyValue? (KeyValue)cell: copyToNewKeyValue(cell);
} }
} }

View File

@ -213,10 +213,12 @@ public final class CellUtil {
* inside Put, etc., keeping Cells organized by family. * inside Put, etc., keeping Cells organized by family.
* @return CellScanner interface over <code>cellIterable</code> * @return CellScanner interface over <code>cellIterable</code>
*/ */
public static CellScanner createCellScanner(final NavigableMap<byte [], List<Cell>> map) { public static CellScanner createCellScanner(final NavigableMap<byte [],
List<? extends Cell>> map) {
return new CellScanner() { return new CellScanner() {
private final Iterator<Entry<byte[], List<Cell>>> entries = map.entrySet().iterator(); private final Iterator<Entry<byte[], List<? extends Cell>>> entries =
private Iterator<Cell> currentIterator = null; map.entrySet().iterator();
private Iterator<? extends Cell> currentIterator = null;
private Cell currentCell; private Cell currentCell;
@Override @Override

View File

@ -45,11 +45,11 @@ public class TestCellUtil {
@Test @Test
public void testCreateCellScannerFamilyMap() { public void testCreateCellScannerFamilyMap() {
final int count = 3; final int count = 3;
final NavigableMap<byte [], List<Cell>> map = final NavigableMap<byte [], List<? extends Cell>> map =
new TreeMap<byte [], List<Cell>>(Bytes.BYTES_COMPARATOR); new TreeMap<byte [], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
byte [] key = Bytes.toBytes(i); byte [] key = Bytes.toBytes(i);
Cell [] cs = getCells(count, key); KeyValue [] cs = getCells(count, key);
map.put(key, Arrays.asList(cs)); map.put(key, Arrays.asList(cs));
} }
CellScanner scanner = CellUtil.createCellScanner(map); CellScanner scanner = CellUtil.createCellScanner(map);
@ -60,8 +60,8 @@ public class TestCellUtil {
assertEquals(count * count, i); assertEquals(count * count, i);
} }
static Cell [] getCells(final int howMany, final byte [] family) { static KeyValue [] getCells(final int howMany, final byte [] family) {
Cell [] cells = new Cell[howMany]; KeyValue [] cells = new KeyValue[howMany];
for (int i = 0; i < howMany; i++) { for (int i = 0; i < howMany; i++) {
byte [] index = Bytes.toBytes(i); byte [] index = Bytes.toBytes(i);
KeyValue kv = new KeyValue(index, family, index, index); KeyValue kv = new KeyValue(index, family, index, index);

View File

@ -50,8 +50,8 @@
</testResource> </testResource>
</testResources> </testResources>
<plugins> <plugins>
<!-- Run with -Dmaven.test.skip.exec=true to build -tests.jar without running <!-- Run with -Dmaven.test.skip.exec=true to build -tests.jar without running
tests (this is needed for upstream projects whose tests need this jar simply for tests (this is needed for upstream projects whose tests need this jar simply for
compilation) --> compilation) -->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
@ -62,8 +62,8 @@
<mainClass>org/apache/hadoop/hbase/mapreduce/Driver</mainClass> <mainClass>org/apache/hadoop/hbase/mapreduce/Driver</mainClass>
</manifest> </manifest>
</archive> </archive>
<!-- Exclude these 2 packages, because their dependency _binary_ files <!-- Exclude these 2 packages, because their dependency _binary_ files
include the sources, and Maven 2.2 appears to add them to the sources to compile, include the sources, and Maven 2.2 appears to add them to the sources to compile,
weird --> weird -->
<excludes> <excludes>
<exclude>org/apache/jute/**</exclude> <exclude>org/apache/jute/**</exclude>
@ -200,9 +200,10 @@
</plugins> </plugins>
<!-- General Resources --> <!-- General Resources -->
<pluginManagement> <pluginManagement>
<plugins> <plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no
the Maven build itself and needs to be kept in plugin management, not in the actual plugins. --> influence on the Maven build itself and needs to be kept in plugin management, not in
the actual plugins. -->
<plugin> <plugin>
<groupId>org.eclipse.m2e</groupId> <groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId> <artifactId>lifecycle-mapping</artifactId>
@ -532,11 +533,13 @@
<configuration> <configuration>
<target> <target>
<mkdir dir="${project.build.directory}/native"/> <mkdir dir="${project.build.directory}/native"/>
<exec executable="cmake" dir="${project.build.directory}/native" <exec executable="cmake" dir="${project.build.directory}/native"
failonerror="true"> failonerror="true">
<arg line="${basedir}/src/main/native -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/> <arg
line="${basedir}/src/main/native -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/>
</exec> </exec>
<exec executable="make" dir="${project.build.directory}/native" failonerror="true"> <exec executable="make" dir="${project.build.directory}/native"
failonerror="true">
<arg line="VERBOSE=1"/> <arg line="VERBOSE=1"/>
</exec> </exec>
</target> </target>
@ -550,8 +553,8 @@
<!-- Profiles for building against different hadoop versions --> <!-- Profiles for building against different hadoop versions -->
<!-- There are a lot of common dependencies used here, should investigate <!-- There are a lot of common dependencies used here, should investigate
if we can combine these profiles somehow --> if we can combine these profiles somehow -->
<!-- profile against Hadoop 1.0.x: This is the default. It has to have the same <!-- profile against Hadoop 1.0.x: This is the default. It has to have the same
activation property as the parent Hadoop 1.0.x profile to make sure it gets run at activation property as the parent Hadoop 1.0.x profile to make sure it gets run at
the same time. --> the same time. -->
<profile> <profile>
<id>hadoop-1.0</id> <id>hadoop-1.0</id>

View File

@ -25,10 +25,12 @@ import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.Cell;
/** /**
* Emits sorted Puts. * Emits sorted Puts.
@ -61,8 +63,9 @@ public class PutSortReducer extends
// stop at the end or the RAM threshold // stop at the end or the RAM threshold
while (iter.hasNext() && curSize < threshold) { while (iter.hasNext() && curSize < threshold) {
Put p = iter.next(); Put p = iter.next();
for (List<KeyValue> kvs : p.getFamilyMap().values()) { for (List<? extends Cell> cells: p.getFamilyMap().values()) {
for (KeyValue kv : kvs) { for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
map.add(kv); map.add(kv);
curSize += kv.getLength(); curSize += kv.getLength();
} }

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
@ -135,6 +136,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.Cell;
import org.cliffc.high_scale_lib.Counter; import org.cliffc.high_scale_lib.Counter;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -1826,7 +1828,7 @@ public class HRegion implements HeapSize { // , Writable{
* @param writeToWAL * @param writeToWAL
* @throws IOException * @throws IOException
*/ */
void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId, void delete(NavigableMap<byte[], List<? extends Cell>> familyMap, UUID clusterId,
boolean writeToWAL) throws IOException { boolean writeToWAL) throws IOException {
Delete delete = new Delete(HConstants.EMPTY_BYTE_ARRAY); Delete delete = new Delete(HConstants.EMPTY_BYTE_ARRAY);
delete.setFamilyMap(familyMap); delete.setFamilyMap(familyMap);
@ -1842,15 +1844,16 @@ public class HRegion implements HeapSize { // , Writable{
* @param byteNow * @param byteNow
* @throws IOException * @throws IOException
*/ */
void prepareDeleteTimestamps(Map<byte[], List<KeyValue>> familyMap, byte[] byteNow) void prepareDeleteTimestamps(Map<byte[], List<? extends Cell>> familyMap, byte[] byteNow)
throws IOException { throws IOException {
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey(); byte[] family = e.getKey();
List<KeyValue> kvs = e.getValue(); List<? extends Cell> cells = e.getValue();
Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (KeyValue kv: kvs) { for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
// Check if time is LATEST, change to time of most recent addition if so // Check if time is LATEST, change to time of most recent addition if so
// This is expensive. // This is expensive.
if (kv.isLatestTimestamp() && kv.isDeleteType()) { if (kv.isLatestTimestamp() && kv.isDeleteType()) {
@ -2064,7 +2067,7 @@ public class HRegion implements HeapSize { // , Writable{
/** Keep track of the locks we hold so we can release them in finally clause */ /** Keep track of the locks we hold so we can release them in finally clause */
List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired // reference family maps directly so coprocessors can mutate them if desired
Map<byte[],List<KeyValue>>[] familyMaps = new Map[batchOp.operations.length]; Map<byte[], List<? extends Cell>>[] familyMaps = new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive) // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess; int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex; int lastIndexExclusive = firstIndex;
@ -2083,7 +2086,7 @@ public class HRegion implements HeapSize { // , Writable{
boolean isPutMutation = mutation instanceof Put; boolean isPutMutation = mutation instanceof Put;
Integer providedLockId = nextPair.getSecond(); Integer providedLockId = nextPair.getSecond();
Map<byte[], List<KeyValue>> familyMap = mutation.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = mutation.getFamilyMap();
// store the family map reference to allow for mutations // store the family map reference to allow for mutations
familyMaps[lastIndexExclusive] = familyMap; familyMaps[lastIndexExclusive] = familyMap;
@ -2520,15 +2523,15 @@ public class HRegion implements HeapSize { // , Writable{
} }
/** /**
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the provided current * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
* timestamp. * provided current timestamp.
*/ */
void updateKVTimestamps( void updateKVTimestamps(final Iterable<List<? extends Cell>> keyLists, final byte[] now) {
final Iterable<List<KeyValue>> keyLists, final byte[] now) { for (List<? extends Cell> cells: keyLists) {
for (List<KeyValue> keys: keyLists) { if (cells == null) continue;
if (keys == null) continue; for (Cell cell : cells) {
for (KeyValue key : keys) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
key.updateLatestStamp(now); kv.updateLatestStamp(now);
} }
} }
} }
@ -2616,10 +2619,10 @@ public class HRegion implements HeapSize { // , Writable{
* @praram now * @praram now
* @throws IOException * @throws IOException
*/ */
private void put(final byte [] row, byte [] family, List<KeyValue> edits) private void put(final byte [] row, byte [] family, List<? extends Cell> edits)
throws IOException { throws IOException {
Map<byte[], List<KeyValue>> familyMap; NavigableMap<byte[], List<? extends Cell>> familyMap;
familyMap = new HashMap<byte[], List<KeyValue>>(); familyMap = new TreeMap<byte[], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
familyMap.put(family, edits); familyMap.put(family, edits);
Put p = new Put(row); Put p = new Put(row);
@ -2641,7 +2644,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return the additional memory usage of the memstore caused by the * @return the additional memory usage of the memstore caused by the
* new entries. * new entries.
*/ */
private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap, private long applyFamilyMapToMemstore(Map<byte[], List<? extends Cell>> familyMap,
MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
long size = 0; long size = 0;
boolean freemvcc = false; boolean freemvcc = false;
@ -2652,12 +2655,13 @@ public class HRegion implements HeapSize { // , Writable{
freemvcc = true; freemvcc = true;
} }
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey(); byte[] family = e.getKey();
List<KeyValue> edits = e.getValue(); List<? extends Cell> cells = e.getValue();
Store store = getStore(family); Store store = getStore(family);
for (KeyValue kv: edits) { for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
size += store.add(kv); size += store.add(kv);
} }
@ -2677,7 +2681,7 @@ public class HRegion implements HeapSize { // , Writable{
* the wal. This method is then invoked to rollback the memstore. * the wal. This method is then invoked to rollback the memstore.
*/ */
private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp, private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
Map<byte[], List<KeyValue>>[] familyMaps, Map<byte[], List<? extends Cell>>[] familyMaps,
int start, int end) { int start, int end) {
int kvsRolledback = 0; int kvsRolledback = 0;
for (int i = start; i < end; i++) { for (int i = start; i < end; i++) {
@ -2688,17 +2692,17 @@ public class HRegion implements HeapSize { // , Writable{
} }
// Rollback all the kvs for this row. // Rollback all the kvs for this row.
Map<byte[], List<KeyValue>> familyMap = familyMaps[i]; Map<byte[], List<? extends Cell>> familyMap = familyMaps[i];
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey(); byte[] family = e.getKey();
List<KeyValue> edits = e.getValue(); List<? extends Cell> cells = e.getValue();
// Remove those keys from the memstore that matches our // Remove those keys from the memstore that matches our
// key's (row, cf, cq, timestamp, memstoreTS). The interesting part is // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
// that even the memstoreTS has to match for keys that will be rolleded-back. // that even the memstoreTS has to match for keys that will be rolleded-back.
Store store = getStore(family); Store store = getStore(family);
for (KeyValue kv: edits) { for (Cell cell: cells) {
store.rollback(kv); store.rollback(KeyValueUtil.ensureKeyValue(cell));
kvsRolledback++; kvsRolledback++;
} }
} }
@ -2718,18 +2722,19 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap, void checkTimestamps(final Map<byte[], List<? extends Cell>> familyMap,
long now) throws FailedSanityCheckException { long now) throws FailedSanityCheckException {
if (timestampSlop == HConstants.LATEST_TIMESTAMP) { if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
return; return;
} }
long maxTs = now + timestampSlop; long maxTs = now + timestampSlop;
for (List<KeyValue> kvs : familyMap.values()) { for (List<? extends Cell> kvs : familyMap.values()) {
for (KeyValue kv : kvs) { for (Cell cell : kvs) {
// see if the user-side TS is out of range. latest = server-side // see if the user-side TS is out of range. latest = server-side
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) { if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
throw new FailedSanityCheckException("Timestamp for KV out of range " throw new FailedSanityCheckException("Timestamp for KV out of range "
+ kv + " (too.new=" + timestampSlop + ")"); + cell + " (too.new=" + timestampSlop + ")");
} }
} }
} }
@ -2741,11 +2746,11 @@ public class HRegion implements HeapSize { // , Writable{
* @param familyMap map of family->edits * @param familyMap map of family->edits
* @param walEdit the destination entry to append into * @param walEdit the destination entry to append into
*/ */
private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap, private void addFamilyMapToWALEdit(Map<byte[], List<? extends Cell>> familyMap,
WALEdit walEdit) { WALEdit walEdit) {
for (List<KeyValue> edits : familyMap.values()) { for (List<? extends Cell> edits : familyMap.values()) {
for (KeyValue kv : edits) { for (Cell cell : edits) {
walEdit.add(kv); walEdit.add(KeyValueUtil.ensureKeyValue(cell));
} }
} }
} }
@ -3450,7 +3455,7 @@ public class HRegion implements HeapSize { // , Writable{
public HRegionInfo getRegionInfo() { public HRegionInfo getRegionInfo() {
return regionInfo; return regionInfo;
} }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException { throws IOException {
// DebugPrint.println("HRegionScanner.<init>"); // DebugPrint.println("HRegionScanner.<init>");
@ -3600,11 +3605,11 @@ public class HRegion implements HeapSize { // , Writable{
return next(outResults, batch, metric); return next(outResults, batch, metric);
} }
private void populateFromJoinedHeap(List<KeyValue> results, int limit, String metric) private void populateFromJoinedHeap(List<KeyValue> results, int limit, String metric)
throws IOException { throws IOException {
assert joinedContinuationRow != null; assert joinedContinuationRow != null;
KeyValue kv = populateResult(results, this.joinedHeap, limit, KeyValue kv = populateResult(results, this.joinedHeap, limit,
joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(), joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
joinedContinuationRow.getRowLength(), metric); joinedContinuationRow.getRowLength(), metric);
if (kv != KV_LIMIT) { if (kv != KV_LIMIT) {
// We are done with this row, reset the continuation. // We are done with this row, reset the continuation.
@ -3626,7 +3631,7 @@ public class HRegion implements HeapSize { // , Writable{
* @param metric Metric key to be passed into KeyValueHeap::next(). * @param metric Metric key to be passed into KeyValueHeap::next().
* @return KV_LIMIT if limit reached, next KeyValue otherwise. * @return KV_LIMIT if limit reached, next KeyValue otherwise.
*/ */
private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit, private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
byte[] currentRow, int offset, short length, String metric) throws IOException { byte[] currentRow, int offset, short length, String metric) throws IOException {
KeyValue nextKv; KeyValue nextKv;
do { do {
@ -4214,15 +4219,15 @@ public class HRegion implements HeapSize { // , Writable{
// The row key is the region name // The row key is the region name
byte[] row = r.getRegionName(); byte[] row = r.getRegionName();
final long now = EnvironmentEdgeManager.currentTimeMillis(); final long now = EnvironmentEdgeManager.currentTimeMillis();
final List<KeyValue> edits = new ArrayList<KeyValue>(2); final List<KeyValue> cells = new ArrayList<KeyValue>(2);
edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY, cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER, now, HConstants.REGIONINFO_QUALIFIER, now,
r.getRegionInfo().toByteArray())); r.getRegionInfo().toByteArray()));
// Set into the root table the version of the meta table. // Set into the root table the version of the meta table.
edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY, cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
HConstants.META_VERSION_QUALIFIER, now, HConstants.META_VERSION_QUALIFIER, now,
Bytes.toBytes(HConstants.META_VERSION))); Bytes.toBytes(HConstants.META_VERSION)));
meta.put(row, HConstants.CATALOG_FAMILY, edits); meta.put(row, HConstants.CATALOG_FAMILY, cells);
} }
/** /**
@ -4819,15 +4824,15 @@ public class HRegion implements HeapSize { // , Writable{
try { try {
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family // Process each family
for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap() for (Map.Entry<byte[], List<? extends Cell>> family : append.getFamilyMap().entrySet()) {
.entrySet()) {
Store store = stores.get(family.getKey()); Store store = stores.get(family.getKey());
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size()); List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
// Get previous values for all columns in this family // Get previous values for all columns in this family
Get get = new Get(row); Get get = new Get(row);
for (KeyValue kv : family.getValue()) { for (Cell cell : family.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
get.addColumn(family.getKey(), kv.getQualifier()); get.addColumn(family.getKey(), kv.getQualifier());
} }
List<KeyValue> results = get(get, false); List<KeyValue> results = get(get, false);
@ -4839,7 +4844,8 @@ public class HRegion implements HeapSize { // , Writable{
// once. // once.
// Would be nice if KeyValue had scatter/gather logic // Would be nice if KeyValue had scatter/gather logic
int idx = 0; int idx = 0;
for (KeyValue kv : family.getValue()) { for (Cell cell : family.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
KeyValue newKV; KeyValue newKV;
if (idx < results.size() if (idx < results.size()
&& results.get(idx).matchingQualifier(kv.getBuffer(), && results.get(idx).matchingQualifier(kv.getBuffer(),
@ -4913,7 +4919,8 @@ public class HRegion implements HeapSize { // , Writable{
size += store.upsert(entry.getValue(), getSmallestReadPoint()); size += store.upsert(entry.getValue(), getSmallestReadPoint());
} else { } else {
// otherwise keep older versions around // otherwise keep older versions around
for (KeyValue kv : entry.getValue()) { for (Cell cell: entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
size += store.add(kv); size += store.add(kv);
} }
} }
@ -4962,7 +4969,7 @@ public class HRegion implements HeapSize { // , Writable{
TimeRange tr = increment.getTimeRange(); TimeRange tr = increment.getTimeRange();
boolean flush = false; boolean flush = false;
WALEdit walEdits = null; WALEdit walEdits = null;
List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns()); List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.size());
Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>(); Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
long size = 0; long size = 0;
@ -4984,16 +4991,17 @@ public class HRegion implements HeapSize { // , Writable{
try { try {
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family // Process each family
for (Map.Entry<byte [], NavigableMap<byte [], Long>> family : for (Map.Entry<byte [], List<? extends Cell>> family:
increment.getFamilyMap().entrySet()) { increment.getFamilyMap().entrySet()) {
Store store = stores.get(family.getKey()); Store store = stores.get(family.getKey());
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size()); List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
// Get previous values for all columns in this family // Get previous values for all columns in this family
Get get = new Get(row); Get get = new Get(row);
for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) { for (Cell cell: family.getValue()) {
get.addColumn(family.getKey(), column.getKey()); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
get.addColumn(family.getKey(), kv.getQualifier());
} }
get.setTimeRange(tr.getMin(), tr.getMax()); get.setTimeRange(tr.getMin(), tr.getMax());
List<KeyValue> results = get(get, false); List<KeyValue> results = get(get, false);
@ -5001,11 +5009,12 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were // Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount // found, otherwise add new column initialized to the increment amount
int idx = 0; int idx = 0;
for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) { for (Cell cell: family.getValue()) {
long amount = column.getValue(); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (idx < results.size() && long amount = Bytes.toLong(kv.getValue());
results.get(idx).matchingQualifier(column.getKey())) { byte [] qualifier = kv.getQualifier();
KeyValue kv = results.get(idx); if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) {
kv = results.get(idx);
if(kv.getValueLength() == Bytes.SIZEOF_LONG) { if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG); amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
} else { } else {
@ -5017,8 +5026,8 @@ public class HRegion implements HeapSize { // , Writable{
} }
// Append new incremented KeyValue to list // Append new incremented KeyValue to list
KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), KeyValue newKV =
now, Bytes.toBytes(amount)); new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount));
newKV.setMemstoreTS(w.getWriteNumber()); newKV.setMemstoreTS(w.getWriteNumber());
kvs.add(newKV); kvs.add(newKV);
@ -5053,7 +5062,8 @@ public class HRegion implements HeapSize { // , Writable{
size += store.upsert(entry.getValue(), getSmallestReadPoint()); size += store.upsert(entry.getValue(), getSmallestReadPoint());
} else { } else {
// otherwise keep older versions around // otherwise keep older versions around
for (KeyValue kv : entry.getValue()) { for (Cell cell : entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
size += store.add(kv); size += store.add(kv);
} }
} }
@ -5444,7 +5454,7 @@ public class HRegion implements HeapSize { // , Writable{
* Update counters for numer of puts without wal and the size of possible data loss. * Update counters for numer of puts without wal and the size of possible data loss.
* These information are exposed by the region server metrics. * These information are exposed by the region server metrics.
*/ */
private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) { private void recordPutWithoutWal(final Map<byte [], List<? extends Cell>> familyMap) {
numPutsWithoutWAL.increment(); numPutsWithoutWAL.increment();
if (numPutsWithoutWAL.get() <= 1) { if (numPutsWithoutWAL.get() <= 1) {
LOG.info("writing data to region " + this + LOG.info("writing data to region " + this +
@ -5452,8 +5462,9 @@ public class HRegion implements HeapSize { // , Writable{
} }
long putSize = 0; long putSize = 0;
for (List<KeyValue> edits : familyMap.values()) { for (List<? extends Cell> cells: familyMap.values()) {
for (KeyValue kv : edits) { for (Cell cell : cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
putSize += kv.getKeyLength() + kv.getValueLength(); putSize += kv.getKeyLength() + kv.getValueLength();
} }
} }

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.Cell;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableCollection;
@ -1820,10 +1821,10 @@ public class HStore implements Store {
} }
@Override @Override
public long upsert(Iterable<KeyValue> kvs, long readpoint) throws IOException { public long upsert(Iterable<? extends Cell> cells, long readpoint) throws IOException {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
return this.memstore.upsert(kvs, readpoint); return this.memstore.upsert(cells, readpoint);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }

View File

@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException; import java.rmi.UnexpectedException;
import java.util.Arrays; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -38,11 +38,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hbase.Cell;
/** /**
* The MemStore holds in-memory modifications to the Store. Modifications * The MemStore holds in-memory modifications to the Store. Modifications
@ -498,9 +500,9 @@ public class MemStore implements HeapSize {
// create or update (upsert) a new KeyValue with // create or update (upsert) a new KeyValue with
// 'now' and a 0 memstoreTS == immediately visible // 'now' and a 0 memstoreTS == immediately visible
return upsert(Arrays.asList( List<Cell> cells = new ArrayList<Cell>(1);
new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))), 1L cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
); return upsert(cells, 1L);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
@ -520,16 +522,16 @@ public class MemStore implements HeapSize {
* This is called under row lock, so Get operations will still see updates * This is called under row lock, so Get operations will still see updates
* atomically. Scans will only see each KeyValue update as atomic. * atomically. Scans will only see each KeyValue update as atomic.
* *
* @param kvs * @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs * @param readpoint readpoint below which we can safely remove duplicate KVs
* @return change in memstore size * @return change in memstore size
*/ */
public long upsert(Iterable<KeyValue> kvs, long readpoint) { public long upsert(Iterable<? extends Cell> cells, long readpoint) {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
long size = 0; long size = 0;
for (KeyValue kv : kvs) { for (Cell cell : cells) {
size += upsert(kv, readpoint); size += upsert(cell, readpoint);
} }
return size; return size;
} finally { } finally {
@ -548,16 +550,17 @@ public class MemStore implements HeapSize {
* <p> * <p>
* Callers must hold the read lock. * Callers must hold the read lock.
* *
* @param kv * @param cell
* @return change in size of MemStore * @return change in size of MemStore
*/ */
private long upsert(KeyValue kv, long readpoint) { private long upsert(Cell cell, long readpoint) {
// Add the KeyValue to the MemStore // Add the KeyValue to the MemStore
// Use the internalAdd method here since we (a) already have a lock // Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially // and (b) cannot safely use the MSLAB here without potentially
// hitting OOME - see TestMemStore.testUpsertMSLAB for a // hitting OOME - see TestMemStore.testUpsertMSLAB for a
// test that triggers the pathological case if we don't avoid MSLAB // test that triggers the pathological case if we don't avoid MSLAB
// here. // here.
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
long addedSize = internalAdd(kv); long addedSize = internalAdd(kv);
// Get the KeyValues for the row/family/qualifier regardless of timestamp. // Get the KeyValues for the row/family/qualifier regardless of timestamp.

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProto
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
/** /**
* A <code>MultiRowProcessor</code> that performs multiple puts and deletes. * A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
@ -70,7 +72,7 @@ MultiRowMutationProcessorResponse> {
// Check mutations and apply edits to a single WALEdit // Check mutations and apply edits to a single WALEdit
for (Mutation m : mutations) { for (Mutation m : mutations) {
if (m instanceof Put) { if (m instanceof Put) {
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = m.getFamilyMap();
region.checkFamilies(familyMap.keySet()); region.checkFamilies(familyMap.keySet());
region.checkTimestamps(familyMap, now); region.checkTimestamps(familyMap, now);
region.updateKVTimestamps(familyMap.values(), byteNow); region.updateKVTimestamps(familyMap.values(), byteNow);
@ -83,9 +85,10 @@ MultiRowMutationProcessorResponse> {
"Action must be Put or Delete. But was: " "Action must be Put or Delete. But was: "
+ m.getClass().getName()); + m.getClass().getName());
} }
for (List<KeyValue> edits : m.getFamilyMap().values()) { for (List<? extends Cell> cells: m.getFamilyMap().values()) {
boolean writeToWAL = m.getWriteToWAL(); boolean writeToWAL = m.getWriteToWAL();
for (KeyValue kv : edits) { for (Cell cell : cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
mutationKvs.add(kv); mutationKvs.add(kv);
if (writeToWAL) { if (writeToWAL) {
walEdit.add(kv); walEdit.add(kv);

View File

@ -36,8 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hbase.Cell;
import com.google.common.collect.ImmutableList;
/** /**
* Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or
@ -102,12 +101,12 @@ public interface Store extends HeapSize, StoreConfigInformation {
* <p> * <p>
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them. * across all of them.
* @param kvs * @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs * @param readpoint readpoint below which we can safely remove duplicate KVs
* @return memstore size delta * @return memstore size delta
* @throws IOException * @throws IOException
*/ */
public long upsert(Iterable<KeyValue> kvs, long readpoint) throws IOException; public long upsert(Iterable<? extends Cell> cells, long readpoint) throws IOException;
/** /**
* Adds a value to the memstore * Adds a value to the memstore

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -63,6 +64,7 @@ import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.rest.model.ScannerModel; import org.apache.hadoop.hbase.rest.model.ScannerModel;
import org.apache.hadoop.hbase.rest.model.TableSchemaModel; import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
/** /**
* HTable interface to remote tables accessed via REST gateway * HTable interface to remote tables accessed via REST gateway
@ -183,8 +185,9 @@ public class RemoteHTable implements HTableInterface {
protected CellSetModel buildModelFromPut(Put put) { protected CellSetModel buildModelFromPut(Put put) {
RowModel row = new RowModel(put.getRow()); RowModel row = new RowModel(put.getRow());
long ts = put.getTimeStamp(); long ts = put.getTimeStamp();
for (List<KeyValue> kvs: put.getFamilyMap().values()) { for (List<? extends Cell> cells: put.getFamilyMap().values()) {
for (KeyValue kv: kvs) { for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
row.addCell(new CellModel(kv.getFamily(), kv.getQualifier(), row.addCell(new CellModel(kv.getFamily(), kv.getQualifier(),
ts != HConstants.LATEST_TIMESTAMP ? ts : kv.getTimestamp(), ts != HConstants.LATEST_TIMESTAMP ? ts : kv.getTimestamp(),
kv.getValue())); kv.getValue()));
@ -388,25 +391,26 @@ public class RemoteHTable implements HTableInterface {
// ignores the row specification in the URI // ignores the row specification in the URI
// separate puts by row // separate puts by row
TreeMap<byte[],List<KeyValue>> map = TreeMap<byte[],List<Cell>> map =
new TreeMap<byte[],List<KeyValue>>(Bytes.BYTES_COMPARATOR); new TreeMap<byte[],List<Cell>>(Bytes.BYTES_COMPARATOR);
for (Put put: puts) { for (Put put: puts) {
byte[] row = put.getRow(); byte[] row = put.getRow();
List<KeyValue> kvs = map.get(row); List<Cell> cells = map.get(row);
if (kvs == null) { if (cells == null) {
kvs = new ArrayList<KeyValue>(); cells = new ArrayList<Cell>();
map.put(row, kvs); map.put(row, cells);
} }
for (List<KeyValue> l: put.getFamilyMap().values()) { for (List<? extends Cell> l: put.getFamilyMap().values()) {
kvs.addAll(l); cells.addAll(l);
} }
} }
// build the cell set // build the cell set
CellSetModel model = new CellSetModel(); CellSetModel model = new CellSetModel();
for (Map.Entry<byte[], List<KeyValue>> e: map.entrySet()) { for (Map.Entry<byte[], List<Cell>> e: map.entrySet()) {
RowModel row = new RowModel(e.getKey()); RowModel row = new RowModel(e.getKey());
for (KeyValue kv: e.getValue()) { for (Cell cell: e.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
row.addCell(new CellModel(kv)); row.addCell(new CellModel(kv));
} }
model.addRow(row); model.addRow(row);

View File

@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
@ -69,6 +71,7 @@ import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.Cell;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap; import com.google.common.collect.ListMultimap;
@ -154,11 +157,12 @@ public class AccessController extends BaseRegionObserver
* table updates. * table updates.
*/ */
void updateACL(RegionCoprocessorEnvironment e, void updateACL(RegionCoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap) { final Map<byte[], List<? extends Cell>> familyMap) {
Set<byte[]> tableSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); Set<byte[]> tableSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], List<KeyValue>> f : familyMap.entrySet()) { for (Map.Entry<byte[], List<? extends Cell>> f : familyMap.entrySet()) {
List<KeyValue> kvs = f.getValue(); List<? extends Cell> cells = f.getValue();
for (KeyValue kv: kvs) { for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (Bytes.equals(kv.getBuffer(), kv.getFamilyOffset(), if (Bytes.equals(kv.getBuffer(), kv.getFamilyOffset(),
kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0, kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
AccessControlLists.ACL_LIST_FAMILY.length)) { AccessControlLists.ACL_LIST_FAMILY.length)) {
@ -964,9 +968,15 @@ public class AccessController extends BaseRegionObserver
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c, public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
final Increment increment) final Increment increment)
throws IOException { throws IOException {
// Create a map of family to qualifiers.
Map<byte[], Set<byte[]>> familyMap = Maps.newHashMap(); Map<byte[], Set<byte[]>> familyMap = Maps.newHashMap();
for (Map.Entry<byte[], ? extends Map<byte[], Long>> entry : increment.getFamilyMap().entrySet()) { for (Map.Entry<byte [], List<? extends Cell>> entry: increment.getFamilyMap().entrySet()) {
familyMap.put(entry.getKey(), entry.getValue().keySet()); Set<byte []> qualifiers = new HashSet<byte []>(entry.getValue().size());
for (Cell cell: entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
qualifiers.add(kv.getQualifier());
}
familyMap.put(entry.getKey(), qualifiers);
} }
requirePermission("increment", Permission.Action.WRITE, c.getEnvironment(), familyMap); requirePermission("increment", Permission.Action.WRITE, c.getEnvironment(), familyMap);
return null; return null;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.thrift2;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.thrift2.generated.*; import org.apache.hadoop.hbase.thrift2.generated.*;
@ -37,13 +38,13 @@ public class ThriftUtilities {
/** /**
* Creates a {@link Get} (HBase) from a {@link TGet} (Thrift). * Creates a {@link Get} (HBase) from a {@link TGet} (Thrift).
* *
* This ignores any timestamps set on {@link TColumn} objects. * This ignores any timestamps set on {@link TColumn} objects.
* *
* @param in the <code>TGet</code> to convert * @param in the <code>TGet</code> to convert
* *
* @return <code>Get</code> object * @return <code>Get</code> object
* *
* @throws IOException if an invalid time range or max version parameter is given * @throws IOException if an invalid time range or max version parameter is given
*/ */
public static Get getFromThrift(TGet in) throws IOException { public static Get getFromThrift(TGet in) throws IOException {
@ -77,11 +78,11 @@ public class ThriftUtilities {
/** /**
* Converts multiple {@link TGet}s (Thrift) into a list of {@link Get}s (HBase). * Converts multiple {@link TGet}s (Thrift) into a list of {@link Get}s (HBase).
* *
* @param in list of <code>TGet</code>s to convert * @param in list of <code>TGet</code>s to convert
* *
* @return list of <code>Get</code> objects * @return list of <code>Get</code> objects
* *
* @throws IOException if an invalid time range or max version parameter is given * @throws IOException if an invalid time range or max version parameter is given
* @see #getFromThrift(TGet) * @see #getFromThrift(TGet)
*/ */
@ -95,9 +96,9 @@ public class ThriftUtilities {
/** /**
* Creates a {@link TResult} (Thrift) from a {@link Result} (HBase). * Creates a {@link TResult} (Thrift) from a {@link Result} (HBase).
* *
* @param in the <code>Result</code> to convert * @param in the <code>Result</code> to convert
* *
* @return converted result, returns an empty result if the input is <code>null</code> * @return converted result, returns an empty result if the input is <code>null</code>
*/ */
public static TResult resultFromHBase(Result in) { public static TResult resultFromHBase(Result in) {
@ -122,11 +123,11 @@ public class ThriftUtilities {
/** /**
* Converts multiple {@link Result}s (HBase) into a list of {@link TResult}s (Thrift). * Converts multiple {@link Result}s (HBase) into a list of {@link TResult}s (Thrift).
* *
* @param in array of <code>Result</code>s to convert * @param in array of <code>Result</code>s to convert
* *
* @return list of converted <code>TResult</code>s * @return list of converted <code>TResult</code>s
* *
* @see #resultFromHBase(Result) * @see #resultFromHBase(Result)
*/ */
public static List<TResult> resultsFromHBase(Result[] in) { public static List<TResult> resultsFromHBase(Result[] in) {
@ -139,9 +140,9 @@ public class ThriftUtilities {
/** /**
* Creates a {@link Put} (HBase) from a {@link TPut} (Thrift) * Creates a {@link Put} (HBase) from a {@link TPut} (Thrift)
* *
* @param in the <code>TPut</code> to convert * @param in the <code>TPut</code> to convert
* *
* @return converted <code>Put</code> * @return converted <code>Put</code>
*/ */
public static Put putFromThrift(TPut in) { public static Put putFromThrift(TPut in) {
@ -169,11 +170,11 @@ public class ThriftUtilities {
/** /**
* Converts multiple {@link TPut}s (Thrift) into a list of {@link Put}s (HBase). * Converts multiple {@link TPut}s (Thrift) into a list of {@link Put}s (HBase).
* *
* @param in list of <code>TPut</code>s to convert * @param in list of <code>TPut</code>s to convert
* *
* @return list of converted <code>Put</code>s * @return list of converted <code>Put</code>s
* *
* @see #putFromThrift(TPut) * @see #putFromThrift(TPut)
*/ */
public static List<Put> putsFromThrift(List<TPut> in) { public static List<Put> putsFromThrift(List<TPut> in) {
@ -186,9 +187,9 @@ public class ThriftUtilities {
/** /**
* Creates a {@link Delete} (HBase) from a {@link TDelete} (Thrift). * Creates a {@link Delete} (HBase) from a {@link TDelete} (Thrift).
* *
* @param in the <code>TDelete</code> to convert * @param in the <code>TDelete</code> to convert
* *
* @return converted <code>Delete</code> * @return converted <code>Delete</code>
*/ */
public static Delete deleteFromThrift(TDelete in) { public static Delete deleteFromThrift(TDelete in) {
@ -233,11 +234,11 @@ public class ThriftUtilities {
/** /**
* Converts multiple {@link TDelete}s (Thrift) into a list of {@link Delete}s (HBase). * Converts multiple {@link TDelete}s (Thrift) into a list of {@link Delete}s (HBase).
* *
* @param in list of <code>TDelete</code>s to convert * @param in list of <code>TDelete</code>s to convert
* *
* @return list of converted <code>Delete</code>s * @return list of converted <code>Delete</code>s
* *
* @see #deleteFromThrift(TDelete) * @see #deleteFromThrift(TDelete)
*/ */
@ -259,12 +260,14 @@ public class ThriftUtilities {
} }
// Map<family, List<KeyValue>> // Map<family, List<KeyValue>>
for (Map.Entry<byte[], List<KeyValue>> familyEntry : in.getFamilyMap().entrySet()) { for (Map.Entry<byte[], List<? extends org.apache.hbase.Cell>> familyEntry:
in.getFamilyMap().entrySet()) {
TColumn column = new TColumn(ByteBuffer.wrap(familyEntry.getKey())); TColumn column = new TColumn(ByteBuffer.wrap(familyEntry.getKey()));
for (KeyValue keyValue : familyEntry.getValue()) { for (org.apache.hbase.Cell cell: familyEntry.getValue()) {
byte[] family = keyValue.getFamily(); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
byte[] qualifier = keyValue.getQualifier(); byte[] family = kv.getFamily();
long timestamp = keyValue.getTimestamp(); byte[] qualifier = kv.getQualifier();
long timestamp = kv.getTimestamp();
if (family != null) { if (family != null) {
column.setFamily(family); column.setFamily(family);
} }
@ -272,7 +275,7 @@ public class ThriftUtilities {
column.setQualifier(qualifier); column.setQualifier(qualifier);
} }
if (timestamp != HConstants.LATEST_TIMESTAMP) { if (timestamp != HConstants.LATEST_TIMESTAMP) {
column.setTimestamp(keyValue.getTimestamp()); column.setTimestamp(kv.getTimestamp());
} }
} }
columns.add(column); columns.add(column);

View File

@ -3634,7 +3634,8 @@ public class TestFromClientSide {
assertEquals(put.size(), 1); assertEquals(put.size(), 1);
assertEquals(put.getFamilyMap().get(CONTENTS_FAMILY).size(), 1); assertEquals(put.getFamilyMap().get(CONTENTS_FAMILY).size(), 1);
KeyValue kv = put.getFamilyMap().get(CONTENTS_FAMILY).get(0); // KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO
KeyValue kv = (KeyValue)put.getFamilyMap().get(CONTENTS_FAMILY).get(0);
assertTrue(Bytes.equals(kv.getFamily(), CONTENTS_FAMILY)); assertTrue(Bytes.equals(kv.getFamily(), CONTENTS_FAMILY));
// will it return null or an empty byte array? // will it return null or an empty byte array?
@ -4158,7 +4159,7 @@ public class TestFromClientSide {
mrmBuilder.addMutationRequest(m2); mrmBuilder.addMutationRequest(m2);
MultiMutateRequest mrm = mrmBuilder.build(); MultiMutateRequest mrm = mrmBuilder.build();
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrm); service.mutateRows(null, mrm);
Get g = new Get(ROW); Get g = new Get(ROW);

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.Cell;
/** /**
* A sample region observer that tests the RegionObserver interface. * A sample region observer that tests the RegionObserver interface.
@ -310,27 +311,30 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit, final Put put, final WALEdit edit,
final boolean writeToWAL) throws IOException { final boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = put.getFamilyMap();
RegionCoprocessorEnvironment e = c.getEnvironment(); RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e); assertNotNull(e);
assertNotNull(e.getRegion()); assertNotNull(e.getRegion());
assertNotNull(familyMap); assertNotNull(familyMap);
if (Arrays.equals(e.getRegion().getTableDesc().getName(), if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) { TestRegionObserverInterface.TEST_TABLE)) {
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A); List<? extends Cell> cells = familyMap.get(TestRegionObserverInterface.A);
assertNotNull(kvs); assertNotNull(cells);
assertNotNull(kvs.get(0)); assertNotNull(cells.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(), KeyValue kv = (KeyValue)cells.get(0);
assertTrue(Bytes.equals(kv.getQualifier(),
TestRegionObserverInterface.A)); TestRegionObserverInterface.A));
kvs = familyMap.get(TestRegionObserverInterface.B); cells = familyMap.get(TestRegionObserverInterface.B);
assertNotNull(kvs); assertNotNull(cells);
assertNotNull(kvs.get(0)); assertNotNull(cells.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(), kv = (KeyValue)cells.get(0);
assertTrue(Bytes.equals(kv.getQualifier(),
TestRegionObserverInterface.B)); TestRegionObserverInterface.B));
kvs = familyMap.get(TestRegionObserverInterface.C); cells = familyMap.get(TestRegionObserverInterface.C);
assertNotNull(kvs); assertNotNull(cells);
assertNotNull(kvs.get(0)); assertNotNull(cells.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(), kv = (KeyValue)cells.get(0);
assertTrue(Bytes.equals(kv.getQualifier(),
TestRegionObserverInterface.C)); TestRegionObserverInterface.C));
} }
hadPrePut = true; hadPrePut = true;
@ -340,28 +344,31 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit, final Put put, final WALEdit edit,
final boolean writeToWAL) throws IOException { final boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = put.getFamilyMap();
RegionCoprocessorEnvironment e = c.getEnvironment(); RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e); assertNotNull(e);
assertNotNull(e.getRegion()); assertNotNull(e.getRegion());
assertNotNull(familyMap); assertNotNull(familyMap);
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A); List<? extends Cell> cells = familyMap.get(TestRegionObserverInterface.A);
if (Arrays.equals(e.getRegion().getTableDesc().getName(), if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) { TestRegionObserverInterface.TEST_TABLE)) {
assertNotNull(kvs); assertNotNull(cells);
assertNotNull(kvs.get(0)); assertNotNull(cells.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(), // KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO
TestRegionObserverInterface.A)); KeyValue kv = (KeyValue)cells.get(0);
kvs = familyMap.get(TestRegionObserverInterface.B); assertTrue(Bytes.equals(kv.getQualifier(), TestRegionObserverInterface.A));
assertNotNull(kvs); cells = familyMap.get(TestRegionObserverInterface.B);
assertNotNull(kvs.get(0)); assertNotNull(cells);
assertTrue(Bytes.equals(kvs.get(0).getQualifier(), assertNotNull(cells.get(0));
TestRegionObserverInterface.B)); // KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO
kvs = familyMap.get(TestRegionObserverInterface.C); kv = (KeyValue)cells.get(0);
assertNotNull(kvs); assertTrue(Bytes.equals(kv.getQualifier(), TestRegionObserverInterface.B));
assertNotNull(kvs.get(0)); cells = familyMap.get(TestRegionObserverInterface.C);
assertTrue(Bytes.equals(kvs.get(0).getQualifier(), assertNotNull(cells);
TestRegionObserverInterface.C)); assertNotNull(cells.get(0));
// KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO
kv = (KeyValue)cells.get(0);
assertTrue(Bytes.equals(kv.getQualifier(), TestRegionObserverInterface.C));
} }
hadPostPut = true; hadPostPut = true;
} }
@ -370,7 +377,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c, public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit, final Delete delete, final WALEdit edit,
final boolean writeToWAL) throws IOException { final boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = delete.getFamilyMap();
RegionCoprocessorEnvironment e = c.getEnvironment(); RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e); assertNotNull(e);
assertNotNull(e.getRegion()); assertNotNull(e.getRegion());
@ -384,7 +391,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit, final Delete delete, final WALEdit edit,
final boolean writeToWAL) throws IOException { final boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = delete.getFamilyMap();
RegionCoprocessorEnvironment e = c.getEnvironment(); RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e); assertNotNull(e);
assertNotNull(e.getRegion()); assertNotNull(e.getRegion());

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hbase.Cell;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -206,12 +207,10 @@ public class TestRegionObserverBypass {
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final boolean writeToWAL) final Put put, final WALEdit edit, final boolean writeToWAL)
throws IOException { throws IOException {
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = put.getFamilyMap();
if (familyMap.containsKey(test)) { if (familyMap.containsKey(test)) {
e.bypass(); e.bypass();
} }
} }
} }
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hbase.Cell;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -52,8 +53,9 @@ import java.util.Map;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
* Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} * Tests invocation of the
* interface hooks at all appropriate times during normal HMaster operations. * {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface hooks at
* all appropriate times during normal HMaster operations.
*/ */
@Category(MediumTests.class) @Category(MediumTests.class)
public class TestWALObserver { public class TestWALObserver {
@ -62,17 +64,11 @@ public class TestWALObserver {
private static byte[] TEST_TABLE = Bytes.toBytes("observedTable"); private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"), private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
Bytes.toBytes("fam2"), Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
Bytes.toBytes("fam3"),
};
private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"), private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
Bytes.toBytes("q2"), Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
Bytes.toBytes("q3"),
};
private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"), private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
Bytes.toBytes("v2"), Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
Bytes.toBytes("v3"),
};
private static byte[] TEST_ROW = Bytes.toBytes("testRow"); private static byte[] TEST_ROW = Bytes.toBytes("testRow");
private Configuration conf; private Configuration conf;
@ -94,8 +90,8 @@ public class TestWALObserver {
conf.setInt("dfs.client.block.recovery.retries", 2); conf.setInt("dfs.client.block.recovery.retries", 2);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
Path hbaseRootDir = Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); .makeQualified(new Path("/hbase"));
LOG.info("hbase.rootdir=" + hbaseRootDir); LOG.info("hbase.rootdir=" + hbaseRootDir);
conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString()); conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString());
} }
@ -108,11 +104,12 @@ public class TestWALObserver {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
//this.cluster = TEST_UTIL.getDFSCluster(); // this.cluster = TEST_UTIL.getDFSCluster();
this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR)); this.hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR));
this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName()); this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); this.oldLogDir = new Path(this.hbaseRootDir,
HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME); this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
this.logName = HConstants.HREGION_LOGDIR_NAME; this.logName = HConstants.HREGION_LOGDIR_NAME;
@ -127,21 +124,22 @@ public class TestWALObserver {
} }
/** /**
* Test WAL write behavior with WALObserver. The coprocessor monitors * Test WAL write behavior with WALObserver. The coprocessor monitors a
* a WALEdit written to WAL, and ignore, modify, and add KeyValue's for the * WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
* WALEdit. * WALEdit.
*/ */
@Test @Test
public void testWALObserverWriteToWAL() throws Exception { public void testWALObserverWriteToWAL() throws Exception {
HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE)); HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir); deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName())); fs.mkdirs(new Path(basedir, hri.getEncodedName()));
HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir, HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir,
TestWALObserver.class.getName(), this.conf); TestWALObserver.class.getName(), this.conf);
SampleRegionWALObserver cp = getCoprocessor(log); SampleRegionWALObserver cp = getCoprocessor(log);
@ -149,8 +147,7 @@ public class TestWALObserver {
// TEST_FAMILY[1] value shall be changed. // TEST_FAMILY[1] value shall be changed.
// TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put. // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0], cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
TEST_FAMILY[2], TEST_QUALIFIER[2]);
assertFalse(cp.isPreWALWriteCalled()); assertFalse(cp.isPreWALWriteCalled());
assertFalse(cp.isPostWALWriteCalled()); assertFalse(cp.isPostWALWriteCalled());
@ -160,7 +157,7 @@ public class TestWALObserver {
// Use a Put to create familyMap. // Use a Put to create familyMap.
Put p = creatPutWith2Families(TEST_ROW); Put p = creatPutWith2Families(TEST_ROW);
Map<byte [], List<KeyValue>> familyMap = p.getFamilyMap(); Map<byte[], List<? extends Cell>> familyMap = p.getFamilyMap();
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
addFamilyMapToWALEdit(familyMap, edit); addFamilyMapToWALEdit(familyMap, edit);
@ -224,9 +221,12 @@ public class TestWALObserver {
// WAL replay is handled at HRegion::replayRecoveredEdits(), which is // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
// ultimately called by HRegion::initialize() // ultimately called by HRegion::initialize()
byte[] tableName = Bytes.toBytes("testWALCoprocessorReplay"); byte[] tableName = Bytes.toBytes("testWALCoprocessorReplay");
final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(Bytes.toString(tableName)); final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(Bytes
//final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); .toString(tableName));
//final HRegionInfo hri1 = createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); // final HRegionInfo hri =
// createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
// final HRegionInfo hri1 =
// createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
final HRegionInfo hri = new HRegionInfo(tableName, null, null); final HRegionInfo hri = new HRegionInfo(tableName, null, null);
final Path basedir = new Path(this.hbaseRootDir, Bytes.toString(tableName)); final Path basedir = new Path(this.hbaseRootDir, Bytes.toString(tableName));
@ -235,19 +235,19 @@ public class TestWALObserver {
final Configuration newConf = HBaseConfiguration.create(this.conf); final Configuration newConf = HBaseConfiguration.create(this.conf);
//HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf); // HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
HLog wal = createWAL(this.conf); HLog wal = createWAL(this.conf);
//Put p = creatPutWith2Families(TEST_ROW); // Put p = creatPutWith2Families(TEST_ROW);
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
//addFamilyMapToWALEdit(p.getFamilyMap(), edit); // addFamilyMapToWALEdit(p.getFamilyMap(), edit);
final int countPerFamily = 1000; final int countPerFamily = 1000;
//for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
for (HColumnDescriptor hcd: htd.getFamilies()) { for (HColumnDescriptor hcd : htd.getFamilies()) {
//addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, // addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
//EnvironmentEdgeManager.getDelegate(), wal); // EnvironmentEdgeManager.getDelegate(), wal);
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
EnvironmentEdgeManager.getDelegate(), wal, htd); EnvironmentEdgeManager.getDelegate(), wal, htd);
} }
wal.append(hri, tableName, edit, now, htd); wal.append(hri, tableName, edit, now, htd);
// sync to fs. // sync to fs.
@ -281,32 +281,34 @@ public class TestWALObserver {
} }
/** /**
* Test to see CP loaded successfully or not. There is a duplication * Test to see CP loaded successfully or not. There is a duplication at
* at TestHLog, but the purpose of that one is to see whether the loaded * TestHLog, but the purpose of that one is to see whether the loaded CP will
* CP will impact existing HLog tests or not. * impact existing HLog tests or not.
*/ */
@Test @Test
public void testWALObserverLoaded() throws Exception { public void testWALObserverLoaded() throws Exception {
HLog log = HLogFactory.createHLog(fs, hbaseRootDir, HLog log = HLogFactory.createHLog(fs, hbaseRootDir,
TestWALObserver.class.getName(), conf); TestWALObserver.class.getName(), conf);
assertNotNull(getCoprocessor(log)); assertNotNull(getCoprocessor(log));
} }
private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception { private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
WALCoprocessorHost host = wal.getCoprocessorHost(); WALCoprocessorHost host = wal.getCoprocessorHost();
Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName()); Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class
return (SampleRegionWALObserver)c; .getName());
return (SampleRegionWALObserver) c;
} }
/* /*
* Creates an HRI around an HTD that has <code>tableName</code> and three * Creates an HRI around an HTD that has <code>tableName</code> and three
* column families named. * column families named.
*
* @param tableName Name of table to use when we create HTableDescriptor. * @param tableName Name of table to use when we create HTableDescriptor.
*/ */
private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) { private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
HTableDescriptor htd = new HTableDescriptor(tableName); HTableDescriptor htd = new HTableDescriptor(tableName);
for (int i = 0; i < TEST_FAMILY.length; i++ ) { for (int i = 0; i < TEST_FAMILY.length; i++) {
HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]); HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
htd.addFamily(a); htd.addFamily(a);
} }
@ -326,27 +328,30 @@ public class TestWALObserver {
private Put creatPutWith2Families(byte[] row) throws IOException { private Put creatPutWith2Families(byte[] row) throws IOException {
Put p = new Put(row); Put p = new Put(row);
for (int i = 0; i < TEST_FAMILY.length-1; i++ ) { for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
TEST_VALUE[i]);
} }
return p; return p;
} }
/** /**
* Copied from HRegion. * Copied from HRegion.
* *
* @param familyMap map of family->edits * @param familyMap
* @param walEdit the destination entry to append into * map of family->edits
* @param walEdit
* the destination entry to append into
*/ */
private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap, private void addFamilyMapToWALEdit(Map<byte[], List<? extends Cell>> familyMap,
WALEdit walEdit) { WALEdit walEdit) {
for (List<KeyValue> edits : familyMap.values()) { for (List<? extends Cell> edits : familyMap.values()) {
for (KeyValue kv : edits) { for (Cell cell : edits) {
walEdit.add(kv); // KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO.
walEdit.add((KeyValue)cell);
} }
} }
} }
private Path runWALSplit(final Configuration c) throws IOException { private Path runWALSplit(final Configuration c) throws IOException {
FileSystem fs = FileSystem.get(c); FileSystem fs = FileSystem.get(c);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c, HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
@ -359,28 +364,31 @@ public class TestWALObserver {
LOG.info("Split file=" + splits.get(0)); LOG.info("Split file=" + splits.get(0));
return splits.get(0); return splits.get(0);
} }
private HLog createWAL(final Configuration c) throws IOException { private HLog createWAL(final Configuration c) throws IOException {
return HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c); return HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c);
} }
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
final byte [] rowName, final byte [] family, private void addWALEdits(final byte[] tableName, final HRegionInfo hri,
final int count, EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd) final byte[] rowName, final byte[] family, final int count,
throws IOException { EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd)
throws IOException {
String familyStr = Bytes.toString(family); String familyStr = Bytes.toString(family);
for (int j = 0; j < count; j++) { for (int j = 0; j < count; j++) {
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes, edit.add(new KeyValue(rowName, family, qualifierBytes, ee
ee.currentTimeMillis(), columnBytes)); .currentTimeMillis(), columnBytes));
wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd); wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd);
} }
} }
private HTableDescriptor getBasic3FamilyHTableDescriptor( private HTableDescriptor getBasic3FamilyHTableDescriptor(
final String tableName) { final String tableName) {
HTableDescriptor htd = new HTableDescriptor(tableName); HTableDescriptor htd = new HTableDescriptor(tableName);
for (int i = 0; i < TEST_FAMILY.length; i++ ) { for (int i = 0; i < TEST_FAMILY.length; i++) {
HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]); HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
htd.addFamily(a); htd.addFamily(a);
} }
@ -398,7 +406,4 @@ public class TestWALObserver {
return htd; return htd;
} }
} }

View File

@ -23,9 +23,9 @@ import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -93,6 +93,7 @@ import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.Cell;
import org.apache.hbase.CellComparator; import org.apache.hbase.CellComparator;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -1144,7 +1145,8 @@ public class TestHRegion extends HBaseTestCase {
//testing existing family //testing existing family
byte [] family = fam2; byte [] family = fam2;
try { try {
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>(); NavigableMap<byte[], List<? extends Cell>> deleteMap =
new TreeMap<byte[], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(family, kvs); deleteMap.put(family, kvs);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true);
} catch (Exception e) { } catch (Exception e) {
@ -1155,7 +1157,8 @@ public class TestHRegion extends HBaseTestCase {
boolean ok = false; boolean ok = false;
family = fam4; family = fam4;
try { try {
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>(); NavigableMap<byte[], List<? extends Cell>> deleteMap =
new TreeMap<byte[], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(family, kvs); deleteMap.put(family, kvs);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true);
} catch (Exception e) { } catch (Exception e) {
@ -1482,7 +1485,8 @@ public class TestHRegion extends HBaseTestCase {
kvs.add(new KeyValue(row1, fam1, col2, null)); kvs.add(new KeyValue(row1, fam1, col2, null));
kvs.add(new KeyValue(row1, fam1, col3, null)); kvs.add(new KeyValue(row1, fam1, col3, null));
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>(); NavigableMap<byte[], List<? extends Cell>> deleteMap =
new TreeMap<byte[], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(fam1, kvs); deleteMap.put(fam1, kvs);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true);

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -795,17 +796,17 @@ public class TestMemStore extends TestCase {
//////////////////////////////////// ////////////////////////////////////
//Test for upsert with MSLAB //Test for upsert with MSLAB
//////////////////////////////////// ////////////////////////////////////
/** /**
* Test a pathological pattern that shows why we can't currently * Test a pathological pattern that shows why we can't currently
* use the MSLAB for upsert workloads. This test inserts data * use the MSLAB for upsert workloads. This test inserts data
* in the following pattern: * in the following pattern:
* *
* - row0001 through row1000 (fills up one 2M Chunk) * - row0001 through row1000 (fills up one 2M Chunk)
* - row0002 through row1001 (fills up another 2M chunk, leaves one reference * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
* to the first chunk * to the first chunk
* - row0003 through row1002 (another chunk, another dangling reference) * - row0003 through row1002 (another chunk, another dangling reference)
* *
* This causes OOME pretty quickly if we use MSLAB for upsert * This causes OOME pretty quickly if we use MSLAB for upsert
* since each 2M chunk is held onto by a single reference. * since each 2M chunk is held onto by a single reference.
*/ */
@ -813,17 +814,17 @@ public class TestMemStore extends TestCase {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setBoolean(MemStore.USEMSLAB_KEY, true); conf.setBoolean(MemStore.USEMSLAB_KEY, true);
memstore = new MemStore(conf, KeyValue.COMPARATOR); memstore = new MemStore(conf, KeyValue.COMPARATOR);
int ROW_SIZE = 2048; int ROW_SIZE = 2048;
byte[] qualifier = new byte[ROW_SIZE - 4]; byte[] qualifier = new byte[ROW_SIZE - 4];
MemoryMXBean bean = ManagementFactory.getMemoryMXBean(); MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
for (int i = 0; i < 3; i++) { System.gc(); } for (int i = 0; i < 3; i++) { System.gc(); }
long usageBefore = bean.getHeapMemoryUsage().getUsed(); long usageBefore = bean.getHeapMemoryUsage().getUsed();
long size = 0; long size = 0;
long ts=0; long ts=0;
for (int newValue = 0; newValue < 1000; newValue++) { for (int newValue = 0; newValue < 1000; newValue++) {
for (int row = newValue; row < newValue + 1000; row++) { for (int row = newValue; row < newValue + 1000; row++) {
byte[] rowBytes = Bytes.toBytes(row); byte[] rowBytes = Bytes.toBytes(row);
@ -834,10 +835,10 @@ public class TestMemStore extends TestCase {
for (int i = 0; i < 3; i++) { System.gc(); } for (int i = 0; i < 3; i++) { System.gc(); }
long usageAfter = bean.getHeapMemoryUsage().getUsed(); long usageAfter = bean.getHeapMemoryUsage().getUsed();
System.out.println("Memory used: " + (usageAfter - usageBefore) System.out.println("Memory used: " + (usageAfter - usageBefore)
+ " (heapsize: " + memstore.heapSize() + + " (heapsize: " + memstore.heapSize() +
" size: " + size + ")"); " size: " + size + ")");
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Helpers // Helpers
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -855,19 +856,19 @@ public class TestMemStore extends TestCase {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
memstore = new MemStore(conf, KeyValue.COMPARATOR); memstore = new MemStore(conf, KeyValue.COMPARATOR);
long oldSize = memstore.size.get(); long oldSize = memstore.size.get();
List<KeyValue> l = new ArrayList<KeyValue>(); List<Cell> l = new ArrayList<Cell>();
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1); kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1);
l.add(kv1); l.add(kv2); l.add(kv3); l.add(kv1); l.add(kv2); l.add(kv3);
this.memstore.upsert(l, 2);// readpoint is 2 this.memstore.upsert(l, 2);// readpoint is 2
long newSize = this.memstore.size.get(); long newSize = this.memstore.size.get();
assert(newSize > oldSize); assert(newSize > oldSize);
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
kv4.setMvccVersion(1); kv4.setMvccVersion(1);
l.clear(); l.add(kv4); l.clear(); l.add(kv4);

View File

@ -18,37 +18,33 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import java.util.Map;
import java.util.List;
import java.util.Random;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.Cell;
/** /**
* This class runs performance benchmarks for {@link HLog}. * This class runs performance benchmarks for {@link HLog}.
@ -330,9 +326,11 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
return put; return put;
} }
private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap, WALEdit walEdit) { private void addFamilyMapToWALEdit(Map<byte[], List<? extends Cell>> familyMap,
for (List<KeyValue> edits : familyMap.values()) { WALEdit walEdit) {
for (KeyValue kv : edits) { for (List<? extends Cell> edits : familyMap.values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
walEdit.add(kv); walEdit.add(kv);
} }
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
@ -55,13 +56,12 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.Cell;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
@ -217,11 +217,13 @@ public class TestCoprocessorScanPolicy {
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put, public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
final WALEdit edit, final boolean writeToWAL) throws IOException { final WALEdit edit, final boolean writeToWAL) throws IOException {
if (put.getAttribute("ttl") != null) { if (put.getAttribute("ttl") != null) {
KeyValue kv = put.getFamilyMap().values().iterator().next().get(0); Cell cell = put.getFamilyMap().values().iterator().next().get(0);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
ttls.put(Bytes.toString(kv.getQualifier()), Bytes.toLong(kv.getValue())); ttls.put(Bytes.toString(kv.getQualifier()), Bytes.toLong(kv.getValue()));
c.bypass(); c.bypass();
} else if (put.getAttribute("versions") != null) { } else if (put.getAttribute("versions") != null) {
KeyValue kv = put.getFamilyMap().values().iterator().next().get(0); Cell cell = put.getFamilyMap().values().iterator().next().get(0);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
versions.put(Bytes.toString(kv.getQualifier()), Bytes.toInt(kv.getValue())); versions.put(Bytes.toString(kv.getQualifier()), Bytes.toInt(kv.getValue()));
c.bypass(); c.bypass();
} }