HBASE-7215 Put, Delete, Increment, Result, all all HBase M/R classes still implement/use Writable

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1415412 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-11-29 22:19:39 +00:00
parent e860e94ded
commit 726f822774
36 changed files with 307 additions and 1087 deletions

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.GenericOptionsParser;
@ -73,7 +72,7 @@ public class IndexBuilder {
* Internal Mapper to be run by Hadoop.
*/
public static class Map extends
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put> {
private byte[] family;
private HashMap<byte[], ImmutableBytesWritable> indexes;

View File

@ -18,33 +18,22 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
/*
/**
* A Get, Put or Delete associated with it's region. Used internally by
* {@link HTable::batch} to associate the action with it's region and maintain
* the index from the original request.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Action<R> implements Writable, Comparable {
public class Action<R> implements Comparable {
private Row action;
private int originalIndex;
private R result;
public Action() {
super();
}
/*
* This constructor is replaced by {@link #Action(Row, int)}
*/
@ -88,22 +77,4 @@ public class Action<R> implements Writable, Comparable {
public int compareTo(Object o) {
return action.compareTo(((Action) o).getAction());
}
// ///////////////////////////////////////////////////////////////////////////
// Writable
// ///////////////////////////////////////////////////////////////////////////
public void write(final DataOutput out) throws IOException {
HbaseObjectWritable.writeObject(out, action, Row.class, null);
out.writeInt(originalIndex);
HbaseObjectWritable.writeObject(out, result,
result != null ? result.getClass() : Writable.class, null);
}
public void readFields(final DataInput in) throws IOException {
this.action = (Row) HbaseObjectWritable.readObject(in, null);
this.originalIndex = in.readInt();
this.result = (R) HbaseObjectWritable.readObject(in, null);
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -43,8 +42,6 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceStability.Stable
public class Append extends Mutation {
private static final String RETURN_RESULTS = "_rr_";
private static final byte APPEND_VERSION = (byte)1;
/**
* @param returnResults
* True (default) if the append operation should return the results.
@ -90,5 +87,4 @@ public class Append extends Mutation {
familyMap.put(family, list);
return this;
}
}
}

View File

@ -24,10 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -67,15 +64,7 @@ import java.util.Map;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Delete extends Mutation
implements Writable, Comparable<Row> {
private static final byte DELETE_VERSION = (byte)3;
/** Constructor for Writable. DO NOT USE */
public Delete() {
this((byte [])null);
}
public class Delete extends Mutation implements Comparable<Row> {
/**
* Create a Delete operation for the specified row.
* <p>
@ -264,52 +253,4 @@ public class Delete extends Mutation
map.put("ts", this.ts);
return map;
}
//Writable
public void readFields(final DataInput in) throws IOException {
int version = in.readByte();
if (version > DELETE_VERSION) {
throw new IOException("version not supported");
}
this.row = Bytes.readByteArray(in);
this.ts = in.readLong();
this.lockId = in.readLong();
if (version > 2) {
this.writeToWAL = in.readBoolean();
}
this.familyMap.clear();
int numFamilies = in.readInt();
for(int i=0;i<numFamilies;i++) {
byte [] family = Bytes.readByteArray(in);
int numColumns = in.readInt();
List<KeyValue> list = new ArrayList<KeyValue>(numColumns);
for(int j=0;j<numColumns;j++) {
KeyValue kv = new KeyValue();
kv.readFields(in);
list.add(kv);
}
this.familyMap.put(family, list);
}
if (version > 1) {
readAttributes(in);
}
}
public void write(final DataOutput out) throws IOException {
out.writeByte(DELETE_VERSION);
Bytes.writeByteArray(out, this.row);
out.writeLong(this.ts);
out.writeLong(this.lockId);
out.writeBoolean(this.writeToWAL);
out.writeInt(familyMap.size());
for(Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
Bytes.writeByteArray(out, entry.getKey());
List<KeyValue> list = entry.getValue();
out.writeInt(list.size());
for(KeyValue kv : list) {
kv.write(out);
}
}
writeAttributes(out);
}
}

View File

@ -65,10 +65,6 @@ import java.util.TreeSet;
public class Get extends OperationWithAttributes
implements Row, Comparable<Row> {
private static final byte VERSION_WITHOUT_PAGINATION = (byte) 2;
private static final byte VERSION_WITH_PAGINATION = (byte) 3;
private static final byte GET_VERSION = VERSION_WITH_PAGINATION;
private byte [] row = null;
private long lockId = -1L;
private int maxVersions = 1;
@ -80,14 +76,6 @@ public class Get extends OperationWithAttributes
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
/** @return the most backward-compatible version for this scan possible for its parameters */
private byte getVersion() {
if (storeLimit != -1 || storeOffset != 0) {
return VERSION_WITH_PAGINATION;
}
return VERSION_WITHOUT_PAGINATION;
}
/**
* Create a Get operation for the specified row.
* <p>

View File

@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
@ -30,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
/**
* Used to perform Increment operations on a single row.
@ -47,8 +44,6 @@ import org.apache.hadoop.io.Writable;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Increment implements Row {
private static final byte INCREMENT_VERSION = (byte)2;
private byte [] row = null;
private long lockId = -1L;
private boolean writeToWAL = true;
@ -275,72 +270,6 @@ public class Increment implements Row {
return sb.toString();
}
//Writable
public void readFields(final DataInput in)
throws IOException {
int version = in.readByte();
if (version > INCREMENT_VERSION) {
throw new IOException("unsupported version");
}
this.row = Bytes.readByteArray(in);
this.tr = new TimeRange();
tr.readFields(in);
this.lockId = in.readLong();
int numFamilies = in.readInt();
if (numFamilies == 0) {
throw new IOException("At least one column required");
}
this.familyMap =
new TreeMap<byte [],NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
for(int i=0; i<numFamilies; i++) {
byte [] family = Bytes.readByteArray(in);
boolean hasColumns = in.readBoolean();
NavigableMap<byte [], Long> set = null;
if(hasColumns) {
int numColumns = in.readInt();
set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
for(int j=0; j<numColumns; j++) {
byte [] qualifier = Bytes.readByteArray(in);
set.put(qualifier, in.readLong());
}
} else {
throw new IOException("At least one column required per family");
}
this.familyMap.put(family, set);
}
if (version > 1) {
this.writeToWAL = in.readBoolean();
}
}
public void write(final DataOutput out)
throws IOException {
out.writeByte(INCREMENT_VERSION);
Bytes.writeByteArray(out, this.row);
tr.write(out);
out.writeLong(this.lockId);
if (familyMap.size() == 0) {
throw new IOException("At least one column required");
}
out.writeInt(familyMap.size());
for(Map.Entry<byte [], NavigableMap<byte [], Long>> entry :
familyMap.entrySet()) {
Bytes.writeByteArray(out, entry.getKey());
NavigableMap<byte [], Long> columnSet = entry.getValue();
if(columnSet == null) {
throw new IOException("At least one column required per family");
} else {
out.writeBoolean(true);
out.writeInt(columnSet.size());
for(Map.Entry<byte [], Long> qualifier : columnSet.entrySet()) {
Bytes.writeByteArray(out, qualifier.getKey());
out.writeLong(qualifier.getValue());
}
}
}
out.writeBoolean(writeToWAL);
}
@Override
public int compareTo(Row i) {
return Bytes.compareTo(this.getRow(), i.getRow());

View File

@ -18,35 +18,29 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Container for Actions (i.e. Get, Delete, or Put), which are grouped by
* regionName. Intended to be used with HConnectionManager.processBatch()
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class MultiAction<R> implements Writable {
public final class MultiAction<R> {
// map of regions to lists of puts/gets/deletes for that region.
public Map<byte[], List<Action<R>>> actions =
new TreeMap<byte[], List<Action<R>>>(
Bytes.BYTES_COMPARATOR);
public Map<byte[], List<Action<R>>> actions = new TreeMap<byte[], List<Action<R>>>(Bytes.BYTES_COMPARATOR);
public MultiAction() {
super();
}
/**
@ -56,7 +50,7 @@ public final class MultiAction<R> implements Writable {
*/
public int size() {
int size = 0;
for (List l : actions.values()) {
for (List<?> l : actions.values()) {
size += l.size();
}
return size;
@ -93,33 +87,4 @@ public final class MultiAction<R> implements Writable {
}
return res;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(actions.size());
for (Map.Entry<byte[], List<Action<R>>> e : actions.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
List<Action<R>> lst = e.getValue();
out.writeInt(lst.size());
for (Action a : lst) {
HbaseObjectWritable.writeObject(out, a, a.getClass(), null);
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
actions.clear();
int mapSize = in.readInt();
for (int i = 0; i < mapSize; i++) {
byte[] key = Bytes.readByteArray(in);
int listSize = in.readInt();
List<Action<R>> lst = new ArrayList<Action<R>>(listSize);
for (int j = 0; j < listSize; j++) {
lst.add((Action) HbaseObjectWritable.readObject(in, null));
}
actions.put(key, lst);
}
}
}

View File

@ -1,239 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
/**
* @deprecated Use MultiAction instead
* Data type class for putting multiple regions worth of puts in one RPC.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MultiPut extends Operation implements Writable {
public HServerAddress address; // client code ONLY
// TODO make this configurable
public static final int DEFAULT_MAX_PUT_OUTPUT = 10;
// map of regions to lists of puts for that region.
public Map<byte[], List<Put> > puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
/**
* Writable constructor only.
*/
public MultiPut() {}
/**
* MultiPut for putting multiple regions worth of puts in one RPC.
* @param a address
*/
public MultiPut(HServerAddress a) {
address = a;
}
public int size() {
int size = 0;
for( List<Put> l : puts.values()) {
size += l.size();
}
return size;
}
public void add(byte[] regionName, Put aPut) {
List<Put> rsput = puts.get(regionName);
if (rsput == null) {
rsput = new ArrayList<Put>();
puts.put(regionName, rsput);
}
rsput.add(aPut);
}
public Collection<Put> allPuts() {
List<Put> res = new ArrayList<Put>();
for ( List<Put> pp : puts.values() ) {
res.addAll(pp);
}
return res;
}
/**
* Compile the table and column family (i.e. schema) information
* into a String. Useful for parsing and aggregation by debugging,
* logging, and administration tools.
* @return Map
*/
@Override
public Map<String, Object> getFingerprint() {
Map<String, Object> map = new HashMap<String, Object>();
// for extensibility, we have a map of table information that we will
// populate with only family information for each table
Map<String, Map> tableInfo =
new HashMap<String, Map>();
map.put("tables", tableInfo);
for (Map.Entry<byte[], List<Put>> entry : puts.entrySet()) {
// our fingerprint only concerns itself with which families are touched,
// not how many Puts touch them, so we use this Set to do just that.
Set<String> familySet;
try {
// since the puts are stored by region, we may have already
// recorded families for this region. if that is the case,
// we want to add to the existing Set. if not, we make a new Set.
String tableName = Bytes.toStringBinary(
HRegionInfo.parseRegionName(entry.getKey())[0]);
if (tableInfo.get(tableName) == null) {
Map<String, Object> table = new HashMap<String, Object>();
familySet = new TreeSet<String>();
table.put("families", familySet);
tableInfo.put(tableName, table);
} else {
familySet = (Set<String>) tableInfo.get(tableName).get("families");
}
} catch (IOException ioe) {
// in the case of parse error, default to labeling by region
Map<String, Object> table = new HashMap<String, Object>();
familySet = new TreeSet<String>();
table.put("families", familySet);
tableInfo.put(Bytes.toStringBinary(entry.getKey()), table);
}
// we now iterate through each Put and keep track of which families
// are affected in this table.
for (Put p : entry.getValue()) {
for (byte[] fam : p.getFamilyMap().keySet()) {
familySet.add(Bytes.toStringBinary(fam));
}
}
}
return map;
}
/**
* Compile the details beyond the scope of getFingerprint (mostly
* toMap from the Puts) into a Map along with the fingerprinted
* information. Useful for debugging, logging, and administration tools.
* @param maxCols a limit on the number of columns output prior to truncation
* @return Map
*/
@Override
public Map<String, Object> toMap(int maxCols) {
Map<String, Object> map = getFingerprint();
Map<String, Object> tableInfo = (Map<String, Object>) map.get("tables");
int putCount = 0;
for (Map.Entry<byte[], List<Put>> entry : puts.entrySet()) {
// If the limit has been hit for put output, just adjust our counter
if (putCount >= DEFAULT_MAX_PUT_OUTPUT) {
putCount += entry.getValue().size();
continue;
}
List<Put> regionPuts = entry.getValue();
List<Map<String, Object>> putSummaries =
new ArrayList<Map<String, Object>>();
// find out how many of this region's puts we can add without busting
// the maximum
int regionPutsToAdd = regionPuts.size();
putCount += regionPutsToAdd;
if (putCount > DEFAULT_MAX_PUT_OUTPUT) {
regionPutsToAdd -= putCount - DEFAULT_MAX_PUT_OUTPUT;
}
for (Iterator<Put> iter = regionPuts.iterator(); regionPutsToAdd-- > 0;) {
putSummaries.add(iter.next().toMap(maxCols));
}
// attempt to extract the table name from the region name
String tableName = "";
try {
tableName = Bytes.toStringBinary(
HRegionInfo.parseRegionName(entry.getKey())[0]);
} catch (IOException ioe) {
// in the case of parse error, default to labeling by region
tableName = Bytes.toStringBinary(entry.getKey());
}
// since the puts are stored by region, we may have already
// recorded puts for this table. if that is the case,
// we want to add to the existing List. if not, we place a new list
// in the map
Map<String, Object> table =
(Map<String, Object>) tableInfo.get(tableName);
if (table == null) {
// in case the Put has changed since getFingerprint's map was built
table = new HashMap<String, Object>();
tableInfo.put(tableName, table);
table.put("puts", putSummaries);
} else if (table.get("puts") == null) {
table.put("puts", putSummaries);
} else {
((List<Map<String, Object>>) table.get("puts")).addAll(putSummaries);
}
}
map.put("totalPuts", putCount);
return map;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(puts.size());
for( Map.Entry<byte[],List<Put>> e : puts.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
List<Put> ps = e.getValue();
out.writeInt(ps.size());
for( Put p : ps ) {
p.write(out);
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
puts.clear();
int mapSize = in.readInt();
for (int i = 0 ; i < mapSize; i++) {
byte[] key = Bytes.readByteArray(in);
int listSize = in.readInt();
List<Put> ps = new ArrayList<Put>(listSize);
for ( int j = 0 ; j < listSize; j++ ) {
Put put = new Put();
put.readFields(in);
ps.add(put);
}
puts.put(key, ps);
}
}
}

View File

@ -1,72 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
/**
* @deprecated Replaced by MultiResponse
* Response class for MultiPut.
*/
public class MultiPutResponse implements Writable {
protected MultiPut request; // used in client code ONLY
protected Map<byte[], Integer> answers = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
public MultiPutResponse() {}
public void addResult(byte[] regionName, int result) {
answers.put(regionName, result);
}
public Integer getAnswer(byte[] region) {
return answers.get(region);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(answers.size());
for( Map.Entry<byte[],Integer> e : answers.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
out.writeInt(e.getValue());
}
}
@Override
public void readFields(DataInput in) throws IOException {
answers.clear();
int mapSize = in.readInt();
for( int i = 0 ; i < mapSize ; i++ ) {
byte[] key = Bytes.readByteArray(in);
int value = in.readInt();
answers.put(key, value);
}
}
}

View File

@ -19,21 +19,11 @@
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringUtils;
import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -45,7 +35,7 @@ import java.util.TreeMap;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MultiResponse implements Writable {
public class MultiResponse {
// map of regionName to list of (Results paired to the original index for that
// Result)
@ -53,6 +43,7 @@ public class MultiResponse implements Writable {
new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
public MultiResponse() {
super();
}
/**
@ -91,80 +82,4 @@ public class MultiResponse implements Writable {
public Map<byte[], List<Pair<Integer, Object>>> getResults() {
return results;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(results.size());
for (Map.Entry<byte[], List<Pair<Integer, Object>>> e : results.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
List<Pair<Integer, Object>> lst = e.getValue();
out.writeInt(lst.size());
for (Pair<Integer, Object> r : lst) {
if (r == null) {
out.writeInt(-1); // Cant have index -1; on other side we recognize -1 as 'null'
} else {
out.writeInt(r.getFirst()); // Can this can npe!?!
Object obj = r.getSecond();
if (obj instanceof Throwable) {
out.writeBoolean(true); // true, Throwable/exception.
Throwable t = (Throwable) obj;
// serialize exception
WritableUtils.writeString(out, t.getClass().getName());
WritableUtils.writeString(out,
StringUtils.stringifyException(t));
} else {
out.writeBoolean(false); // no exception
if (! (obj instanceof Writable))
obj = null; // squash all non-writables to null.
HbaseObjectWritable.writeObject(out, r.getSecond(),
obj != null ? obj.getClass() : Writable.class, null);
}
}
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
results.clear();
int mapSize = in.readInt();
for (int i = 0; i < mapSize; i++) {
byte[] key = Bytes.readByteArray(in);
int listSize = in.readInt();
List<Pair<Integer, Object>> lst = new ArrayList<Pair<Integer, Object>>(
listSize);
for (int j = 0; j < listSize; j++) {
Integer idx = in.readInt();
if (idx == -1) {
lst.add(null);
} else {
boolean isException = in.readBoolean();
Object o = null;
if (isException) {
String klass = WritableUtils.readString(in);
String desc = WritableUtils.readString(in);
try {
// the type-unsafe insertion, but since we control what klass is..
Class<? extends Throwable> c = (Class<? extends Throwable>) Class.forName(klass);
Constructor<? extends Throwable> cn = c.getDeclaredConstructor(String.class);
o = cn.newInstance(desc);
} catch (ClassNotFoundException ignored) {
} catch (NoSuchMethodException ignored) {
} catch (InvocationTargetException ignored) {
} catch (InstantiationException ignored) {
} catch (IllegalAccessException ignored) {
}
} else {
o = HbaseObjectWritable.readObject(in, null);
}
lst.add(new Pair<Integer, Object>(idx, o));
}
}
results.put(key, lst);
}
}
}
}

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.client;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -30,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.WritableUtils;
@InterfaceAudience.Public
@InterfaceStability.Evolving
@ -87,30 +83,6 @@ public abstract class OperationWithAttributes extends Operation implements Attri
return size;
}
protected void writeAttributes(final DataOutput out) throws IOException {
if (this.attributes == null) {
out.writeInt(0);
} else {
out.writeInt(this.attributes.size());
for (Map.Entry<String, byte[]> attr : this.attributes.entrySet()) {
WritableUtils.writeString(out, attr.getKey());
Bytes.writeByteArray(out, attr.getValue());
}
}
}
protected void readAttributes(final DataInput in) throws IOException {
int numAttributes = in.readInt();
if (numAttributes > 0) {
this.attributes = new HashMap<String, byte[]>(numAttributes);
for(int i=0; i<numAttributes; i++) {
String name = WritableUtils.readString(in);
byte[] value = Bytes.readByteArray(in);
this.attributes.put(name, value);
}
}
}
/**
* This method allows you to set an identifier on an operation. The original
* motivation for this was to allow the identifier to be used in slow query

View File

@ -26,10 +26,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -46,18 +43,12 @@ import java.util.TreeMap;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Put extends Mutation
implements HeapSize, Writable, Comparable<Row> {
private static final byte PUT_VERSION = (byte)2;
public class Put extends Mutation implements HeapSize, Comparable<Row> {
private static final long OVERHEAD = ClassSize.align(
ClassSize.OBJECT + 2 * ClassSize.REFERENCE +
2 * Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN +
ClassSize.REFERENCE + ClassSize.TREEMAP);
/** Constructor for Writable. DO NOT USE */
public Put() {}
/**
* Create a Put operation for the specified row.
* @param row row key
@ -363,62 +354,4 @@ public class Put extends Mutation
return ClassSize.align((int)heapsize);
}
//Writable
public void readFields(final DataInput in)
throws IOException {
int version = in.readByte();
if (version > PUT_VERSION) {
throw new IOException("version not supported");
}
this.row = Bytes.readByteArray(in);
this.ts = in.readLong();
this.lockId = in.readLong();
this.writeToWAL = in.readBoolean();
int numFamilies = in.readInt();
if (!this.familyMap.isEmpty()) this.familyMap.clear();
for(int i=0;i<numFamilies;i++) {
byte [] family = Bytes.readByteArray(in);
int numKeys = in.readInt();
List<KeyValue> keys = new ArrayList<KeyValue>(numKeys);
int totalLen = in.readInt();
byte [] buf = new byte[totalLen];
int offset = 0;
for (int j = 0; j < numKeys; j++) {
int keyLength = in.readInt();
in.readFully(buf, offset, keyLength);
keys.add(new KeyValue(buf, offset, keyLength));
offset += keyLength;
}
this.familyMap.put(family, keys);
}
if (version > 1) {
readAttributes(in);
}
}
public void write(final DataOutput out)
throws IOException {
out.writeByte(PUT_VERSION);
Bytes.writeByteArray(out, this.row);
out.writeLong(this.ts);
out.writeLong(this.lockId);
out.writeBoolean(this.writeToWAL);
out.writeInt(familyMap.size());
for (Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
Bytes.writeByteArray(out, entry.getKey());
List<KeyValue> keys = entry.getValue();
out.writeInt(keys.size());
int totalLen = 0;
for(KeyValue kv : keys) {
totalLen += kv.getLength();
}
out.writeInt(totalLen);
for(KeyValue kv : keys) {
out.writeInt(kv.getLength());
out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
}
}
writeAttributes(out);
}
}

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.client;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -36,10 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
/**
* Single row result of a {@link Get} or {@link Scan} query.<p>
@ -64,32 +58,36 @@ import org.apache.hadoop.io.Writable;
* an HBase cell defined by the row, family, qualifier, timestamp, and value.<p>
*
* The underlying {@link KeyValue} objects can be accessed through the method {@link #list()}.
* Each KeyValue can then be accessed
* through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()},
* {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.
* Each KeyValue can then be accessed through
* {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()},
* {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.<p>
*
* If you need to overwrite a Result with another Result instance -- as in the old 'mapred' RecordReader next
* invocations -- then create an empty Result with the null constructor and in then use {@link #copyFrom(Result)}
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Result implements Writable, WritableWithSize {
private static final byte RESULT_VERSION = (byte)1;
private static final int DEFAULT_BUFFER_SIZE = 1024;
private KeyValue [] kvs = null;
private NavigableMap<byte[],
NavigableMap<byte[], NavigableMap<Long, byte[]>>> familyMap = null;
public class Result {
private KeyValue [] kvs;
// We're not using java serialization. Transient here is just a marker to say
// that this is where we cache row if we're ever asked for it.
private transient byte [] row = null;
private ImmutableBytesWritable bytes = null;
// Ditto for familyMap. It can be composed on fly from passed in kvs.
private transient NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> familyMap = null;
// never use directly
private static byte [] buffer = null;
private static final int PAD_WIDTH = 128;
/**
* Constructor used for Writable.
* Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #raw()}.
* Use this to represent no results if <code>null</code> won't do or in old 'mapred' as oppposed to 'mapreduce' package
* MapReduce where you need to overwrite a Result
* instance with a {@link #copyFrom(Result)} call.
*/
public Result() {}
public Result() {
super();
}
/**
* Instantiate a Result with the specified array of KeyValues.
@ -98,9 +96,7 @@ public class Result implements Writable, WritableWithSize {
* @param kvs array of KeyValues
*/
public Result(KeyValue [] kvs) {
if(kvs != null && kvs.length > 0) {
this.kvs = kvs;
}
this.kvs = kvs;
}
/**
@ -110,15 +106,7 @@ public class Result implements Writable, WritableWithSize {
* @param kvs List of KeyValues
*/
public Result(List<KeyValue> kvs) {
this(kvs.toArray(new KeyValue[0]));
}
/**
* Instantiate a Result from the specified raw binary format.
* @param bytes raw binary format of Result
*/
public Result(ImmutableBytesWritable bytes) {
this.bytes = bytes;
this(kvs.toArray(new KeyValue[kvs.size()]));
}
/**
@ -128,10 +116,7 @@ public class Result implements Writable, WritableWithSize {
*/
public byte [] getRow() {
if (this.row == null) {
if(this.kvs == null) {
readFields();
}
this.row = this.kvs.length == 0? null: this.kvs[0].getRow();
this.row = this.kvs == null || this.kvs.length == 0? null: this.kvs[0].getRow();
}
return this.row;
}
@ -154,12 +139,9 @@ public class Result implements Writable, WritableWithSize {
*
* This API is faster than using getFamilyMap() and getMap()
*
* @return array of KeyValues
* @return array of KeyValues; can be null if nothing in the result
*/
public KeyValue[] raw() {
if(this.kvs == null) {
readFields();
}
return kvs;
}
@ -171,9 +153,6 @@ public class Result implements Writable, WritableWithSize {
* @return The sorted list of KeyValue's.
*/
public List<KeyValue> list() {
if(this.kvs == null) {
readFields();
}
return isEmpty()? null: Arrays.asList(raw());
}
@ -544,15 +523,13 @@ public class Result implements Writable, WritableWithSize {
* @return map from families to qualifiers to versions
*/
public NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> getMap() {
if(this.familyMap != null) {
if (this.familyMap != null) {
return this.familyMap;
}
if(isEmpty()) {
return null;
}
this.familyMap =
new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
(Bytes.BYTES_COMPARATOR);
this.familyMap = new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR);
for(KeyValue kv : this.kvs) {
SplitKeyValue splitKV = kv.split();
byte [] family = splitKV.getFamily();
@ -653,26 +630,11 @@ public class Result implements Writable, WritableWithSize {
return kvs[0].getValue();
}
/**
* Returns the raw binary encoding of this Result.<p>
*
* Please note, there may be an offset into the underlying byte array of the
* returned ImmutableBytesWritable. Be sure to use both
* {@link ImmutableBytesWritable#get()} and {@link ImmutableBytesWritable#getOffset()}
* @return pointer to raw binary of Result
*/
public ImmutableBytesWritable getBytes() {
return this.bytes;
}
/**
* Check if the underlying KeyValue [] is empty or not
* @return true if empty
*/
public boolean isEmpty() {
if(this.kvs == null) {
readFields();
}
return this.kvs == null || this.kvs.length == 0;
}
@ -680,9 +642,6 @@ public class Result implements Writable, WritableWithSize {
* @return the size of the underlying KeyValue []
*/
public int size() {
if(this.kvs == null) {
readFields();
}
return this.kvs == null? 0: this.kvs.length;
}
@ -711,179 +670,6 @@ public class Result implements Writable, WritableWithSize {
return sb.toString();
}
//Writable
public void readFields(final DataInput in)
throws IOException {
familyMap = null;
row = null;
kvs = null;
int totalBuffer = in.readInt();
if(totalBuffer == 0) {
bytes = null;
return;
}
byte [] raw = new byte[totalBuffer];
readChunked(in, raw, 0, totalBuffer);
bytes = new ImmutableBytesWritable(raw, 0, totalBuffer);
}
private void readChunked(final DataInput in, byte[] dest, int ofs, int len)
throws IOException {
int maxRead = 8192;
for (; ofs < len; ofs += maxRead)
in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
}
//Create KeyValue[] when needed
private void readFields() {
if (bytes == null) {
this.kvs = new KeyValue[0];
return;
}
byte [] buf = bytes.get();
int offset = bytes.getOffset();
int finalOffset = bytes.getSize() + offset;
List<KeyValue> kvs = new ArrayList<KeyValue>();
while(offset < finalOffset) {
int keyLength = Bytes.toInt(buf, offset);
offset += Bytes.SIZEOF_INT;
kvs.add(new KeyValue(buf, offset, keyLength));
offset += keyLength;
}
this.kvs = kvs.toArray(new KeyValue[kvs.size()]);
}
public long getWritableSize() {
if (isEmpty())
return Bytes.SIZEOF_INT; // int size = 0
long size = Bytes.SIZEOF_INT; // totalLen
for (KeyValue kv : kvs) {
size += kv.getLength();
size += Bytes.SIZEOF_INT; // kv.getLength
}
return size;
}
public void write(final DataOutput out)
throws IOException {
if(isEmpty()) {
out.writeInt(0);
} else {
int totalLen = 0;
for(KeyValue kv : kvs) {
totalLen += kv.getLength() + Bytes.SIZEOF_INT;
}
out.writeInt(totalLen);
for(KeyValue kv : kvs) {
out.writeInt(kv.getLength());
out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
}
}
}
public static long getWriteArraySize(Result [] results) {
long size = Bytes.SIZEOF_BYTE; // RESULT_VERSION
if (results == null || results.length == 0) {
size += Bytes.SIZEOF_INT;
return size;
}
size += Bytes.SIZEOF_INT; // results.length
size += Bytes.SIZEOF_INT; // bufLen
for (Result result : results) {
size += Bytes.SIZEOF_INT; // either 0 or result.size()
if (result == null || result.isEmpty())
continue;
for (KeyValue kv : result.raw()) {
size += Bytes.SIZEOF_INT; // kv.getLength();
size += kv.getLength();
}
}
return size;
}
public static void writeArray(final DataOutput out, Result [] results)
throws IOException {
// Write version when writing array form.
// This assumes that results are sent to the client as Result[], so we
// have an opportunity to handle version differences without affecting
// efficiency.
out.writeByte(RESULT_VERSION);
if(results == null || results.length == 0) {
out.writeInt(0);
return;
}
out.writeInt(results.length);
int bufLen = 0;
for(Result result : results) {
bufLen += Bytes.SIZEOF_INT;
if(result == null || result.isEmpty()) {
continue;
}
for(KeyValue key : result.raw()) {
bufLen += key.getLength() + Bytes.SIZEOF_INT;
}
}
out.writeInt(bufLen);
for(Result result : results) {
if(result == null || result.isEmpty()) {
out.writeInt(0);
continue;
}
out.writeInt(result.size());
for(KeyValue kv : result.raw()) {
out.writeInt(kv.getLength());
out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
}
}
}
public static Result [] readArray(final DataInput in)
throws IOException {
// Read version for array form.
// This assumes that results are sent to the client as Result[], so we
// have an opportunity to handle version differences without affecting
// efficiency.
int version = in.readByte();
if (version > RESULT_VERSION) {
throw new IOException("version not supported");
}
int numResults = in.readInt();
if(numResults == 0) {
return new Result[0];
}
Result [] results = new Result[numResults];
int bufSize = in.readInt();
byte [] buf = new byte[bufSize];
int offset = 0;
for(int i=0;i<numResults;i++) {
int numKeys = in.readInt();
offset += Bytes.SIZEOF_INT;
if(numKeys == 0) {
results[i] = new Result((ImmutableBytesWritable)null);
continue;
}
int initialOffset = offset;
for(int j=0;j<numKeys;j++) {
int keyLen = in.readInt();
Bytes.putInt(buf, offset, keyLen);
offset += Bytes.SIZEOF_INT;
in.readFully(buf, offset, keyLen);
offset += keyLen;
}
int totalLength = offset - initialOffset;
results[i] = new Result(new ImmutableBytesWritable(buf, initialOffset,
totalLength));
}
return results;
}
/**
* Does a deep comparison of two Results, down to the byte arrays.
* @param res1 first result to compare
@ -892,7 +678,7 @@ public class Result implements Writable, WritableWithSize {
*/
public static void compareResults(Result res1, Result res2)
throws Exception {
if (res2 == null) {
if (res2 == null) {
throw new Exception("There wasn't enough rows, we stopped at "
+ Bytes.toStringBinary(res1.getRow()));
}
@ -910,4 +696,14 @@ public class Result implements Writable, WritableWithSize {
}
}
}
}
/**
* Copy another Result into this one. Needed for the old Mapred framework
* @param other
*/
public void copyFrom(Result other) {
this.row = null;
this.familyMap = null;
this.kvs = other.kvs;
}
}

View File

@ -85,12 +85,6 @@ public class Scan extends OperationWithAttributes {
private static final String RAW_ATTR = "_raw_";
private static final String ISOLATION_LEVEL = "_isolationlevel_";
private static final byte VERSION_WITH_PAGINATION = (byte)4;
private static final byte VERSION_WITH_RESULT_SIZE = (byte)3;
private static final byte VERSION_WITH_ATTRIBUTES = (byte)2;
private static final byte SCAN_VERSION = VERSION_WITH_PAGINATION;
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
@ -117,22 +111,6 @@ public class Scan extends OperationWithAttributes {
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
/**
* @return the most backward-compatible version for this scan possible for its parameters
*/
private byte getVersion() {
if (storeLimit != -1 || storeOffset != 0) {
return VERSION_WITH_PAGINATION;
}
if (maxResultSize != -1) {
return VERSION_WITH_RESULT_SIZE;
}
if (getAttributeSize() != 0) {
return VERSION_WITH_ATTRIBUTES;
}
return 1;
}
/**
* Create a Scan operation across all rows.
*/

View File

@ -230,6 +230,8 @@ public class ScannerCallable extends ServerCallable<Result[]> {
if (this.scanMetrics == null || rrs == null) {
return;
}
/*
* broken by protobufs
for (Result rr : rrs) {
if (rr.getBytes() != null) {
this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
@ -239,6 +241,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
}
}
}
*/
}
private void close() {

View File

@ -414,22 +414,6 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
public static long getWritableSize(Object instance, Class declaredClass,
Configuration conf) {
long size = Bytes.SIZEOF_BYTE; // code
if (instance == null) {
return 0L;
}
if (declaredClass.isArray()) {
if (declaredClass.equals(Result[].class)) {
return size + Result.getWriteArraySize((Result[])instance);
}
}
if (declaredClass.equals(Result.class)) {
Result r = (Result) instance;
// one extra class code for writable instance.
return r.getWritableSize() + size + Bytes.SIZEOF_BYTE;
}
return 0L; // no hint is the default.
}
/**
@ -460,8 +444,6 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
// byte-at-a-time we were previously doing.
if (declClass.equals(byte [].class)) {
Bytes.writeByteArray(out, (byte [])instanceObj);
} else if(declClass.equals(Result [].class)) {
Result.writeArray(out, (Result [])instanceObj);
} else {
//if it is a Generic array, write the element's type
if (getClassCode(declaredClass) == GENERIC_ARRAY_CODE) {
@ -641,8 +623,6 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
} else if (declaredClass.isArray()) { // array
if (declaredClass.equals(byte [].class)) {
instance = Bytes.readByteArray(in);
} else if(declaredClass.equals(Result [].class)) {
instance = Result.readArray(in);
} else {
int length = in.readInt();
instance = Array.newInstance(declaredClass.getComponentType(), length);

View File

@ -83,7 +83,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ReflectionUtils;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
@ -104,7 +103,7 @@ import com.google.protobuf.Message.Builder;
@InterfaceAudience.Private
public class HBaseClient {
private static final Log LOG = LogFactory
public static final Log LOG = LogFactory
.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap<ConnectionId, Connection> connections;

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.Mapper;
* @param <V> Writable value class
*/
@Deprecated
public interface TableMap<K extends WritableComparable<? super K>, V extends Writable>
public interface TableMap<K extends WritableComparable<? super K>, V>
extends Mapper<ImmutableBytesWritable, Result, K, V> {
}

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@ -81,6 +83,8 @@ public class TableMapReduceUtil {
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
job.setStrings("io.serializations", job.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
if (addDependencyJars) {
@ -151,6 +155,8 @@ public class TableMapReduceUtil {
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setStrings("io.serializations", job.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
HTable outputTable = new HTable(HBaseConfiguration.create(job), table);

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
@ -226,7 +225,7 @@ public class TableRecordReaderImpl {
if (result != null && result.size() > 0) {
key.set(result.getRow());
lastSuccessfulRow = key.get();
Writables.copyWritable(result, value);
value.copyFrom(result);
return true;
}
return false;

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.Reducer;
*/
@Deprecated
@SuppressWarnings("unchecked")
public interface TableReduce<K extends WritableComparable, V extends Writable>
public interface TableReduce<K extends WritableComparable, V>
extends Reducer<K, V, ImmutableBytesWritable, Put> {
}

View File

@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.OutputFormat;
@ -52,7 +53,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class IdentityTableReducer
extends TableReducer<Writable, Writable, Writable> {
extends TableReducer<Writable, Mutation, Writable> {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
@ -72,9 +73,9 @@ extends TableReducer<Writable, Writable, Writable> {
* @throws InterruptedException When the job gets interrupted.
*/
@Override
public void reduce(Writable key, Iterable<Writable> values, Context context)
public void reduce(Writable key, Iterable<Mutation> values, Context context)
throws IOException, InterruptedException {
for(Writable putOrDelete : values) {
for(Mutation putOrDelete : values) {
context.write(key, putOrDelete);
}
}

View File

@ -261,6 +261,9 @@ public class ImportTsv {
Class mapperClass = mapperClassName != null ?
Class.forName(mapperClassName) : DEFAULT_MAPPER;
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);

View File

@ -30,10 +30,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@ -58,7 +58,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Writable> {
public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
/** Set this to {@link #WAL_OFF} to turn off write-ahead logging (HLog) */
public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
/** Property value to use write-ahead logging */
@ -69,7 +69,7 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
* Record writer for outputting to multiple HTables.
*/
protected static class MultiTableRecordWriter extends
RecordWriter<ImmutableBytesWritable, Writable> {
RecordWriter<ImmutableBytesWritable, Mutation> {
private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
Map<ImmutableBytesWritable, HTable> tables;
Configuration conf;
@ -126,7 +126,7 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
* if the action is not a put or a delete.
*/
@Override
public void write(ImmutableBytesWritable tableName, Writable action) throws IOException {
public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
HTable table = getTable(tableName);
// The actions are not immutable, so we defensively copy them
if (action instanceof Put) {
@ -156,7 +156,7 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
}
@Override
public RecordWriter<ImmutableBytesWritable, Writable> getRecordWriter(TaskAttemptContext context)
public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
return new MultiTableRecordWriter(HBaseConfiguration.create(conf),

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
public class MutationSerialization implements Serialization<Mutation> {
@Override
public boolean accept(Class<?> c) {
return Mutation.class.isAssignableFrom(c);
}
@Override
public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
return new MutationDeserializer();
}
@Override
public Serializer<Mutation> getSerializer(Class<Mutation> c) {
return new MutationSerializer();
}
private static class MutationDeserializer implements Deserializer<Mutation> {
private InputStream in;
@Override
public void close() throws IOException {
in.close();
}
@Override
public Mutation deserialize(Mutation mutation) throws IOException {
Mutate proto = Mutate.parseDelimitedFrom(in);
return ProtobufUtil.toMutation(proto);
}
@Override
public void open(InputStream in) throws IOException {
this.in = in;
}
}
private static class MutationSerializer implements Serializer<Mutation> {
private OutputStream out;
@Override
public void close() throws IOException {
out.close();
}
@Override
public void open(OutputStream out) throws IOException {
this.out = out;
}
@Override
public void serialize(Mutation mutation) throws IOException {
MutateType type;
if (mutation instanceof Put) {
type = MutateType.PUT;
} else if (mutation instanceof Delete) {
type = MutateType.DELETE;
} else {
throw new IllegalArgumentException("Only Put and Delete are supported");
}
ProtobufUtil.toMutate(type, mutation).writeDelimitedTo(out);
}
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
public class ResultSerialization implements Serialization<Result> {
@Override
public boolean accept(Class<?> c) {
return Result.class.isAssignableFrom(c);
}
@Override
public Deserializer<Result> getDeserializer(Class<Result> c) {
return new ResultDeserializer();
}
@Override
public Serializer<Result> getSerializer(Class<Result> c) {
return new ResultSerializer();
}
private static class ResultDeserializer implements Deserializer<Result> {
private InputStream in;
@Override
public void close() throws IOException {
in.close();
}
@Override
public Result deserialize(Result mutation) throws IOException {
ClientProtos.Result proto =
ClientProtos.Result.parseDelimitedFrom(in);
return ProtobufUtil.toResult(proto);
}
@Override
public void open(InputStream in) throws IOException {
this.in = in;
}
}
private static class ResultSerializer implements Serializer<Result> {
private OutputStream out;
@Override
public void close() throws IOException {
out.close();
}
@Override
public void open(OutputStream out) throws IOException {
this.out = out;
}
@Override
public void serialize(Result result) throws IOException {
ProtobufUtil.toResult(result).writeDelimitedTo(out);
}
}
}

View File

@ -18,10 +18,6 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
@ -36,9 +32,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
@ -49,9 +45,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
@ -141,6 +135,8 @@ public class TableMapReduceUtil {
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableInputFormat.INPUT_TABLE, table);
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
if (addDependencyJars) {
addDependencyJars(job);
}
@ -363,6 +359,8 @@ public class TableMapReduceUtil {
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
// Calling this will validate the format

View File

@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@ -48,7 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable>
public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
implements Configurable {
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
@ -85,7 +85,7 @@ implements Configurable {
* @param <KEY> The type of the key.
*/
protected static class TableRecordWriter<KEY>
extends RecordWriter<KEY, Writable> {
extends RecordWriter<KEY, Mutation> {
/** The table to write to. */
private HTable table;
@ -121,7 +121,7 @@ implements Configurable {
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
*/
@Override
public void write(KEY key, Writable value)
public void write(KEY key, Mutation value)
throws IOException {
if (value instanceof Put) this.table.put(new Put((Put)value));
else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
@ -139,7 +139,7 @@ implements Configurable {
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public RecordWriter<KEY, Writable> getRecordWriter(
public RecordWriter<KEY, Mutation> getRecordWriter(
TaskAttemptContext context)
throws IOException, InterruptedException {
return new TableRecordWriter<KEY>(this.table);

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.mapreduce.Reducer;
/**
@ -43,5 +43,5 @@ import org.apache.hadoop.mapreduce.Reducer;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}

View File

@ -132,7 +132,6 @@ import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter;
@ -2372,7 +2371,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return true if the new put was executed, false otherwise
*/
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
CompareOp compareOp, ByteArrayComparable comparator, Writable w,
CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
Integer lockId, boolean writeToWAL)
throws IOException{
checkReadOnly();
@ -2458,8 +2457,7 @@ public class HRegion implements HeapSize { // , Writable{
@SuppressWarnings("unchecked")
private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
DoNotRetryIOException {
Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] { new Pair<Mutation, Integer>(mutation,
lid) };
Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] { new Pair<Mutation, Integer>(mutation, lid) };
OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
@ -2546,13 +2544,13 @@ public class HRegion implements HeapSize { // , Writable{
* @praram now
* @throws IOException
*/
private void put(byte [] family, List<KeyValue> edits, Integer lid)
private void put(final byte [] row, byte [] family, List<KeyValue> edits, Integer lid)
throws IOException {
Map<byte[], List<KeyValue>> familyMap;
familyMap = new HashMap<byte[], List<KeyValue>>();
familyMap.put(family, edits);
Put p = new Put();
Put p = new Put(row);
p.setFamilyMap(familyMap);
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
p.setWriteToWAL(true);
@ -3912,7 +3910,7 @@ public class HRegion implements HeapSize { // , Writable{
edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
HConstants.META_VERSION_QUALIFIER, now,
Bytes.toBytes(HConstants.META_VERSION)));
meta.put(HConstants.CATALOG_FAMILY, edits, lid);
meta.put(row, HConstants.CATALOG_FAMILY, edits, lid);
} finally {
meta.releaseRowLock(lid);
}

View File

@ -3767,7 +3767,7 @@ public class HRegionServer implements ClientProtocol,
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
NameBytesPair value = ProtobufUtil.toParameter(new Result());
NameBytesPair value = ProtobufUtil.toParameter(ClientProtos.Result.newBuilder().build());
resultBuilder.setValue(value);
ActionResult result = resultBuilder.build();

View File

@ -290,6 +290,7 @@ public class RemoteHTable implements HTableInterface {
// fall through
case 404:
return new Result();
case 509:
try {
Thread.sleep(sleepTime);

View File

@ -23,18 +23,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@ -151,6 +145,8 @@ public class TestSerialization {
HConstants.EMPTY_END_ROW);
}
/*
* TODO
@Test public void testPut() throws Exception{
byte[] row = "row".getBytes();
byte[] fam = "fam".getBytes();
@ -254,6 +250,7 @@ public class TestSerialization {
}
}
}
*/
@Test public void testGet() throws Exception{
byte[] row = "row".getBytes();
@ -347,6 +344,8 @@ public class TestSerialization {
assertEquals(tr.getMin(), desTr.getMin());
}
/*
* TODO
@Test public void testResultEmpty() throws Exception {
List<KeyValue> keys = new ArrayList<KeyValue>();
Result r = new Result(keys);
@ -520,6 +519,7 @@ public class TestSerialization {
assertTrue(deResults.length == 0);
}
*/
@Test public void testTimeRange() throws Exception{
TimeRange tr = new TimeRange(0,5);

View File

@ -19,12 +19,6 @@
package org.apache.hadoop.hbase.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.SmallTests;
@ -35,32 +29,9 @@ import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestAttributes {
@Test
public void testAttributesSerialization() throws IOException {
Put put = new Put();
put.setAttribute("attribute1", Bytes.toBytes("value1"));
put.setAttribute("attribute2", Bytes.toBytes("value2"));
put.setAttribute("attribute3", Bytes.toBytes("value3"));
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(byteArrayOutputStream);
put.write(out);
Put put2 = new Put();
Assert.assertTrue(put2.getAttributesMap().isEmpty());
put2.readFields(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
Assert.assertNull(put2.getAttribute("absent"));
Assert.assertTrue(Arrays.equals(Bytes.toBytes("value1"), put2.getAttribute("attribute1")));
Assert.assertTrue(Arrays.equals(Bytes.toBytes("value2"), put2.getAttribute("attribute2")));
Assert.assertTrue(Arrays.equals(Bytes.toBytes("value3"), put2.getAttribute("attribute3")));
Assert.assertEquals(3, put2.getAttributesMap().size());
}
@Test
public void testPutAttributes() {
Put put = new Put();
Put put = new Put(new byte [] {});
Assert.assertTrue(put.getAttributesMap().isEmpty());
Assert.assertNull(put.getAttribute("absent"));
@ -108,7 +79,7 @@ public class TestAttributes {
@Test
public void testDeleteAttributes() {
Delete del = new Delete();
Delete del = new Delete(new byte [] {});
Assert.assertTrue(del.getAttributesMap().isEmpty());
Assert.assertNull(del.getAttribute("absent"));
@ -171,7 +142,7 @@ public class TestAttributes {
@Test
public void testDeleteId() {
Delete delete = new Delete();
Delete delete = new Delete(new byte [] {});
Assert.assertNull("Make sure id is null if unset", delete.toMap().get("id"));
delete.setId("myId");
Assert.assertEquals("myId", delete.toMap().get("id"));
@ -179,7 +150,7 @@ public class TestAttributes {
@Test
public void testPutId() {
Put put = new Put();
Put put = new Put(new byte [] {});
Assert.assertNull("Make sure id is null if unset", put.toMap().get("id"));
put.setId("myId");
Assert.assertEquals("myId", put.toMap().get("id"));

View File

@ -26,10 +26,14 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.HBaseClient;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@ -42,6 +46,10 @@ import static org.junit.Assert.*;
@Category(MediumTests.class)
public class TestMultiParallel {
private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
{
((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
}
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] VALUE = Bytes.toBytes("value");
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
@ -200,6 +208,12 @@ public class TestMultiParallel {
table.close();
}
@Test (timeout=300000)
public void testFlushCommitsNoAbort() throws Exception {
LOG.info("test=testFlushCommitsNoAbort");
doTestFlushCommits(false);
}
/**
* Only run one Multi test with a forced RegionServer abort. Otherwise, the
* unit tests will take an unnecessarily long time to run.
@ -212,12 +226,6 @@ public class TestMultiParallel {
doTestFlushCommits(true);
}
@Test (timeout=300000)
public void testFlushCommitsNoAbort() throws Exception {
LOG.info("test=testFlushCommitsNoAbort");
doTestFlushCommits(false);
}
private void doTestFlushCommits(boolean doAbort) throws Exception {
// Load the data
LOG.info("get new table");
@ -249,16 +257,14 @@ public class TestMultiParallel {
validateLoadedData(table);
// Validate server and region count
List<JVMClusterUtil.RegionServerThread> liveRSs =
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
int count = 0;
for (JVMClusterUtil.RegionServerThread t: liveRSs) {
count++;
LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
}
LOG.info("Count=" + count);
Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
(doAbort ? 1 : 2), count);
Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, (doAbort? 1 : 2), count);
for (JVMClusterUtil.RegionServerThread t: liveRSs) {
int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size();
Assert.assertTrue("Count of regions=" + regions, regions > 10);
@ -416,6 +422,7 @@ public class TestMultiParallel {
validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
table.close();
}
@Test(timeout=300000)

View File

@ -139,7 +139,7 @@ public class TestTimestampsFilter {
// Request an empty list of versions using the Timestamps filter;
// Should return none.
kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
assertEquals(0, kvs.length);
assertEquals(0, kvs == null? 0: kvs.length);
//
// Test the filter using a Scan operation
@ -272,7 +272,7 @@ public class TestTimestampsFilter {
// ask for versions that do not exist.
kvs = getNVersions(ht, cf, rowIdx, colIdx,
Arrays.asList(101L, 102L));
assertEquals(0, kvs.length);
assertEquals(0, kvs == null? 0: kvs.length);
// ask for some versions that exist and some that do not.
kvs = getNVersions(ht, cf, rowIdx, colIdx,