HADOOP-1498. Replace boxed types with primitives in many places. Contributed by stack.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@548523 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Doug Cutting 2007-06-18 22:59:14 +00:00
parent f503143412
commit eef130fc91
34 changed files with 830 additions and 871 deletions

View File

@ -34,4 +34,4 @@ Trunk (unreleased changes)
19. HADOOP-1415 Integrate BSD licensed bloom filter implementation.
20. HADOOP-1465 Add cluster stop/start scripts for hbase
21. HADOOP-1415 Provide configurable per-column bloom filters - part 2.
22. HADOOP-1498. Replace boxed types with primitives in many places.

View File

@ -47,7 +47,9 @@ public class BloomFilterDescriptor implements WritableComparable {
public static final int RETOUCHED_BLOOMFILTER = 3;
/** Default constructor - used in conjunction with Writable */
public BloomFilterDescriptor() {}
public BloomFilterDescriptor() {
super();
}
/**
* @param type The kind of bloom filter to use.

View File

@ -18,29 +18,26 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.TreeMap;
import java.util.Vector;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/*******************************************************************************
/**
* Abstract base class that implements the HScannerInterface.
* Used by the concrete HMemcacheScanner and HStoreScanners
******************************************************************************/
*/
public abstract class HAbstractScanner implements HInternalScannerInterface {
final Log LOG = LogFactory.getLog(this.getClass().getName());
// Pattern to determine if a column key is a regex
static Pattern isRegexPattern = Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$");
static Pattern isRegexPattern =
Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$");
// The kind of match we are doing on a column:
private static enum MATCH_TYPE {
/** Just check the column family name */
FAMILY_ONLY,
@ -55,7 +52,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
// 1. Match on the column family name only
// 2. Match on the column family + column key regex
// 3. Simple match: compare column family + column key literally
private static class ColumnMatcher {
private boolean wildCardmatch;
private MATCH_TYPE matchType;
@ -63,33 +59,24 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
private Pattern columnMatcher;
private Text col;
ColumnMatcher(Text col) throws IOException {
String column = col.toString();
ColumnMatcher(final Text col) throws IOException {
Text qualifier = HStoreKey.extractQualifier(col);
try {
int colpos = column.indexOf(":");
if(colpos == -1) {
throw new InvalidColumnNameException("Column name has no family indicator.");
}
String columnkey = column.substring(colpos + 1);
if(columnkey == null || columnkey.length() == 0) {
if(qualifier == null || qualifier.getLength() == 0) {
this.matchType = MATCH_TYPE.FAMILY_ONLY;
this.family = column.substring(0, colpos);
this.family = HStoreKey.extractFamily(col).toString();
this.wildCardmatch = true;
} else if(isRegexPattern.matcher(columnkey).matches()) {
} else if(isRegexPattern.matcher(qualifier.toString()).matches()) {
this.matchType = MATCH_TYPE.REGEX;
this.columnMatcher = Pattern.compile(column);
this.columnMatcher = Pattern.compile(col.toString());
this.wildCardmatch = true;
} else {
this.matchType = MATCH_TYPE.SIMPLE;
this.col = col;
this.wildCardmatch = false;
}
} catch(Exception e) {
throw new IOException("Column: " + column + ": " + e.getMessage());
throw new IOException("Column: " + col + ": " + e.getMessage());
}
}
@ -119,8 +106,10 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
protected boolean scannerClosed = false; // True when scanning is done
protected HStoreKey keys[]; // Keys retrieved from the sources
protected BytesWritable vals[]; // Values that correspond to those keys
// Keys retrieved from the sources
protected HStoreKey keys[];
// Values that correspond to those keys
protected byte [][] vals;
protected long timestamp; // The timestamp to match entries against
private boolean wildcardMatch;
@ -218,7 +207,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
*
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
*/
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results)
public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
throws IOException {
// Find the next row label (and timestamp)
Text chosenRow = null;

View File

@ -30,11 +30,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@ -285,10 +284,8 @@ public class HClient implements HConstants {
}
boolean found = false;
for(int j = 0; j < values.length; j++) {
if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
inbuf.reset(bytes, bytes.length);
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
inbuf.reset(values[j].getData(), values[j].getData().length);
info.readFields(inbuf);
if(info.tableDesc.getName().equals(tableName)) {
found = true;
@ -398,9 +395,7 @@ public class HClient implements HConstants {
valuesfound += 1;
for(int j = 0; j < values.length; j++) {
if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
inbuf.reset(bytes, bytes.length);
inbuf.reset(values[j].getData(), values[j].getData().length);
info.readFields(inbuf);
isenabled = !info.offLine;
break;
@ -483,9 +478,7 @@ public class HClient implements HConstants {
valuesfound += 1;
for(int j = 0; j < values.length; j++) {
if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
inbuf.reset(bytes, bytes.length);
inbuf.reset(values[j].getData(), values[j].getData().length);
info.readFields(inbuf);
disabled = info.offLine;
break;
@ -737,8 +730,8 @@ public class HClient implements HConstants {
* @throws IOException
*/
private TreeMap<Text, RegionLocation> scanOneMetaRegion(final RegionLocation t,
final Text tableName) throws IOException {
final Text tableName)
throws IOException {
HRegionInterface server = getHRegionConnection(t.serverAddress);
TreeMap<Text, RegionLocation> servers = new TreeMap<Text, RegionLocation>();
for(int tries = 0; servers.size() == 0 && tries < this.numRetries;
@ -772,9 +765,7 @@ public class HClient implements HConstants {
byte[] bytes = null;
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
for(int i = 0; i < values.length; i++) {
bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(values[i].getKey().getColumn(), bytes);
results.put(values[i].getKey().getColumn(), values[i].getData());
}
regionInfo = new HRegionInfo();
bytes = results.get(COL_REGIONINFO);
@ -900,8 +891,7 @@ public class HClient implements HConstants {
}
for(int i = 0; i < values.length; i++) {
if(values[i].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = values[i].getData().get();
inbuf.reset(bytes, bytes.length);
inbuf.reset(values[i].getData(), values[i].getData().length);
HRegionInfo info = new HRegionInfo();
info.readFields(inbuf);
@ -967,22 +957,19 @@ public class HClient implements HConstants {
/**
* Get a single value for the specified row and column
*
* @param row - row key
* @param column - column name
* @return - value for specified row/column
* @param row row key
* @param column column name
* @return value for specified row/column
* @throws IOException
*/
public byte[] get(Text row, Text column) throws IOException {
RegionLocation info = null;
BytesWritable value = null;
byte [] value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getRegionLocation(row);
try {
value = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column);
value = getHRegionConnection(info.serverAddress).
get(info.regionInfo.regionName, row, column);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
@ -992,13 +979,7 @@ public class HClient implements HConstants {
info = null;
}
}
if(value != null) {
byte[] bytes = new byte[value.getSize()];
System.arraycopy(value.get(), 0, bytes, 0, bytes.length);
return bytes;
}
return null;
return value;
}
/**
@ -1012,15 +993,12 @@ public class HClient implements HConstants {
*/
public byte[][] get(Text row, Text column, int numVersions) throws IOException {
RegionLocation info = null;
BytesWritable[] values = null;
byte [][] values = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getRegionLocation(row);
try {
values = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column, numVersions);
info.regionInfo.regionName, row, column, numVersions);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
@ -1034,9 +1012,7 @@ public class HClient implements HConstants {
if(values != null) {
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
for(int i = 0 ; i < values.length; i++) {
byte[] value = new byte[values[i].getSize()];
System.arraycopy(values[i].get(), 0, value, 0, value.length);
bytes.add(value);
bytes.add(values[i]);
}
return bytes.toArray(new byte[values.length][]);
}
@ -1057,14 +1033,12 @@ public class HClient implements HConstants {
public byte[][] get(Text row, Text column, long timestamp, int numVersions)
throws IOException {
RegionLocation info = null;
BytesWritable[] values = null;
byte [][] values = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getRegionLocation(row);
try {
values = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column, timestamp, numVersions);
values = getHRegionConnection(info.serverAddress).
get(info.regionInfo.regionName, row, column, timestamp, numVersions);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
@ -1079,9 +1053,7 @@ public class HClient implements HConstants {
if(values != null) {
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
for(int i = 0 ; i < values.length; i++) {
byte[] value = new byte[values[i].getSize()];
System.arraycopy(values[i].get(), 0, value, 0, value.length);
bytes.add(value);
bytes.add(values[i]);
}
return bytes.toArray(new byte[values.length][]);
}
@ -1118,9 +1090,7 @@ public class HClient implements HConstants {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
if(value != null && value.length != 0) {
for(int i = 0; i < value.length; i++) {
byte[] bytes = new byte[value[i].getData().getSize()];
System.arraycopy(value[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(value[i].getKey().getColumn(), bytes);
results.put(value[i].getKey().getColumn(), value[i].getData());
}
}
return results;
@ -1242,7 +1212,7 @@ public class HClient implements HConstants {
public void put(long lockid, Text column, byte val[]) throws IOException {
try {
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
new BytesWritable(val));
val);
} catch(IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
@ -1432,9 +1402,7 @@ public class HClient implements HConstants {
key.setRow(values[i].getKey().getRow());
key.setVersion(values[i].getKey().getTimestamp());
key.setColumn(EMPTY_COLUMN);
byte[] bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(values[i].getKey().getColumn(), bytes);
results.put(values[i].getKey().getColumn(), values[i].getData());
}
}
return values == null ? false : values.length != 0;

View File

@ -15,7 +15,7 @@
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
/**
@ -106,11 +106,10 @@ public interface HConstants {
static final String UTF8_ENCODING = "UTF-8";
/** Value stored for a deleted item */
static final BytesWritable DELETE_BYTES =
new BytesWritable("HBASE::DELETEVAL".getBytes());
static final ImmutableBytesWritable DELETE_BYTES =
new ImmutableBytesWritable("HBASE::DELETEVAL".getBytes());
/** Value written to HLog on a complete cache flush */
static final BytesWritable COMPLETE_CACHEFLUSH =
new BytesWritable("HBASE::CACHEFLUSH".getBytes());
static final ImmutableBytesWritable COMPLETE_CACHEFLUSH =
new ImmutableBytesWritable("HBASE::CACHEFLUSH".getBytes());
}

View File

@ -18,27 +18,29 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/**
* Internally, we need to be able to determine if the scanner is doing wildcard
* column matches (when only a column family is specified or if a column regex
* is specified) or if multiple members of the same column family were specified.
*
* If so, we need to ignore the timestamp to ensure that we get all the family
* members, as they may have been last updated at different times.
*
* is specified) or if multiple members of the same column family were
* specified. If so, we need to ignore the timestamp to ensure that we get all
* the family members, as they may have been last updated at different times.
* This interface exposes two APIs for querying the scanner.
*/
public interface HInternalScannerInterface {
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException;
public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
throws IOException;
/**
* Close the scanner.
*/
public void close();
/** Returns true if the scanner is matching a column family or regex */
public boolean isWildcardScanner();
/** Returns true if the scanner is matching multiple column family members */
public boolean isMultipleMatchScanner();
}
}

View File

@ -78,7 +78,7 @@ public class HLog implements HConstants {
long filenum = 0;
transient int numEntries = 0;
Integer rollLock = 0;
Integer rollLock = new Integer(0);
/**
* Split up a bunch of log files, that are no longer being written to,
@ -205,6 +205,7 @@ public class HLog implements HConstants {
try {
wait();
} catch (InterruptedException ie) {
// continue;
}
}
@ -282,8 +283,8 @@ public class HLog implements HConstants {
* This is a convenience method that computes a new filename with
* a given file-number.
*/
Path computeFilename(long filenum) {
return new Path(dir, HLOG_DATFILE + String.format("%1$03d", filenum));
Path computeFilename(final long fn) {
return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn));
}
/**
@ -333,7 +334,7 @@ public class HLog implements HConstants {
* @throws IOException
*/
synchronized void append(Text regionName, Text tableName, Text row,
TreeMap<Text, BytesWritable> columns, long timestamp)
TreeMap<Text, byte []> columns, long timestamp)
throws IOException {
if(closed) {
throw new IOException("Cannot append; log is closed");
@ -350,7 +351,7 @@ public class HLog implements HConstants {
}
int counter = 0;
for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) {
for (Map.Entry<Text, byte []> es: columns.entrySet()) {
HLogKey logKey =
new HLogKey(regionName, tableName, row, seqNum[counter++]);
HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
@ -401,6 +402,7 @@ public class HLog implements HConstants {
try {
wait();
} catch (InterruptedException ie) {
// continue
}
}
insideCacheFlush = true;
@ -427,7 +429,7 @@ public class HLog implements HConstants {
}
writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
new HLogEdit(HLog.METACOLUMN, COMPLETE_CACHEFLUSH,
new HLogEdit(HLog.METACOLUMN, COMPLETE_CACHEFLUSH.get(),
System.currentTimeMillis()));
numEntries++;

View File

@ -27,14 +27,15 @@ import java.io.*;
* This just indicates the column and value.
******************************************************************************/
public class HLogEdit implements Writable {
Text column = new Text();
BytesWritable val = new BytesWritable();
long timestamp;
private Text column = new Text();
private byte [] val;
private long timestamp;
public HLogEdit() {
super();
}
public HLogEdit(Text column, BytesWritable bval, long timestamp) {
public HLogEdit(Text column, byte [] bval, long timestamp) {
this.column.set(column);
this.val = bval;
this.timestamp = timestamp;
@ -44,7 +45,7 @@ public class HLogEdit implements Writable {
return this.column;
}
public BytesWritable getVal() {
public byte [] getVal() {
return this.val;
}
@ -55,7 +56,7 @@ public class HLogEdit implements Writable {
@Override
public String toString() {
return getColumn().toString() + " " + this.getTimestamp() + " " +
new String(getVal().get()).trim();
new String(getVal()).trim();
}
//////////////////////////////////////////////////////////////////////////////
@ -64,13 +65,15 @@ public class HLogEdit implements Writable {
public void write(DataOutput out) throws IOException {
this.column.write(out);
this.val.write(out);
out.writeShort(this.val.length);
out.write(this.val);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
this.column.readFields(in);
this.val.readFields(in);
this.val = new byte[in.readShort()];
in.readFully(this.val);
this.timestamp = in.readLong();
}
}

View File

@ -34,6 +34,7 @@ public class HLogKey implements WritableComparable {
/** Create an empty key useful when deserializing */
public HLogKey() {
super();
}
/**
@ -47,6 +48,7 @@ public class HLogKey implements WritableComparable {
* @param logSeqNum - log sequence number
*/
public HLogKey(Text regionName, Text tablename, Text row, long logSeqNum) {
// TODO: Is this copy of the instances necessary? They are expensive.
this.regionName.set(regionName);
this.tablename.set(tablename);
this.row.set(row);

View File

@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
@ -179,10 +178,7 @@ public class HMaster implements HConstants, HMasterInterface,
}
for (int i = 0; i < values.length; i++) {
byte[] bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0,
bytes.length);
results.put(values[i].getKey().getColumn(), bytes);
results.put(values[i].getKey().getColumn(), values[i].getData());
}
HRegionInfo info = HRegion.getRegionInfo(results);
@ -272,7 +268,7 @@ public class HMaster implements HConstants, HMasterInterface,
// The current assignment is no good; load the region.
unassignedRegions.put(info.regionName, info);
assignAttempts.put(info.regionName, 0L);
assignAttempts.put(info.regionName, Long.valueOf(0L));
}
}
}
@ -333,7 +329,7 @@ public class HMaster implements HConstants, HMasterInterface,
private RootScanner rootScanner;
private Thread rootScannerThread;
Integer rootScannerLock = 0;
Integer rootScannerLock = new Integer(0);
@SuppressWarnings("unchecked")
static class MetaRegion implements Comparable {
@ -492,7 +488,7 @@ public class HMaster implements HConstants, HMasterInterface,
MetaScanner metaScanner;
private Thread metaScannerThread;
Integer metaScannerLock = 0;
Integer metaScannerLock = new Integer(0);
/**
* The 'unassignedRegions' table maps from a region name to a HRegionInfo
@ -642,7 +638,8 @@ public class HMaster implements HConstants, HMasterInterface,
this.pendingRegions =
Collections.synchronizedSortedSet(new TreeSet<Text>());
this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
this.assignAttempts.put(HGlobals.rootRegionInfo.regionName,
Long.valueOf(0L));
this.killList =
Collections.synchronizedSortedMap(
@ -655,9 +652,7 @@ public class HMaster implements HConstants, HMasterInterface,
Collections.synchronizedSortedSet(new TreeSet<Text>());
// We're almost open for business
this.closed = false;
LOG.info("HMaster initialized on " + this.address.toString());
}
@ -815,7 +810,9 @@ public class HMaster implements HConstants, HMasterInterface,
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerStartup(org.apache.hadoop.hbase.HServerInfo)
*/
public void regionServerStartup(HServerInfo serverInfo) throws IOException {
@SuppressWarnings("unused")
public void regionServerStartup(HServerInfo serverInfo)
throws IOException {
String s = serverInfo.getServerAddress().toString().trim();
HServerInfo storedInfo = null;
LOG.info("received start message from: " + s);
@ -834,11 +831,15 @@ public class HMaster implements HConstants, HMasterInterface,
// Either way, record the new server
serversToServerInfo.put(s, serverInfo);
if(!closed) {
Text serverLabel = new Text(s);
long serverLabel = getServerLabel(s);
LOG.debug("Created lease for " + serverLabel);
serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
}
}
private long getServerLabel(final String s) {
return s.hashCode();
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerReport(org.apache.hadoop.hbase.HServerInfo, org.apache.hadoop.hbase.HMsg[])
@ -846,7 +847,7 @@ public class HMaster implements HConstants, HMasterInterface,
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
throws IOException {
String s = serverInfo.getServerAddress().toString().trim();
Text serverLabel = new Text(s);
long serverLabel = getServerLabel(s);
if (closed) {
// Cancel the server's lease
@ -874,7 +875,7 @@ public class HMaster implements HConstants, HMasterInterface,
allMetaRegionsScanned = false;
}
unassignedRegions.put(info.regionName, info);
assignAttempts.put(info.regionName, 0L);
assignAttempts.put(info.regionName, Long.valueOf(0L));
}
// We don't need to return anything to the server because it isn't
@ -934,7 +935,8 @@ public class HMaster implements HConstants, HMasterInterface,
}
/** cancel a server's lease */
private void cancelLease(String serverName, Text serverLabel) throws IOException {
private void cancelLease(final String serverName, final long serverLabel)
throws IOException {
if (serversToServerInfo.remove(serverName) != null) {
// Only cancel lease once.
// This method can be called a couple of times during shutdown.
@ -1035,7 +1037,7 @@ public class HMaster implements HConstants, HMasterInterface,
if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region
rootRegionLocation = null;
unassignedRegions.put(region.regionName, region);
assignAttempts.put(region.regionName, 0L);
assignAttempts.put(region.regionName, Long.valueOf(0L));
} else {
boolean reassignRegion = true;
@ -1115,7 +1117,7 @@ public class HMaster implements HConstants, HMasterInterface,
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
assignAttempts.put(curRegionName, now);
assignAttempts.put(curRegionName, Long.valueOf(now));
counter++;
}
@ -1214,7 +1216,6 @@ public class HMaster implements HConstants, HMasterInterface,
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
Text row = null;
byte[] bytes = null;
for(int i = 0; i < values.length; i++) {
if(row == null) {
row = values[i].getKey().getRow();
@ -1225,12 +1226,10 @@ public class HMaster implements HConstants, HMasterInterface,
+ row + ", currentRow=" + values[i].getKey().getRow());
}
}
bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(values[i].getKey().getColumn(), bytes);
results.put(values[i].getKey().getColumn(), values[i].getData());
}
bytes = results.get(COL_SERVER);
byte [] bytes = results.get(COL_SERVER);
String serverName = null;
if(bytes == null || bytes.length == 0) {
// No server
@ -1335,21 +1334,18 @@ public class HMaster implements HConstants, HMasterInterface,
}
// Remove server from root/meta entries
for(int i = 0; i < toDoList.size(); i++) {
ToDoEntry e = toDoList.get(i);
long lockid = server.startUpdate(regionName, clientId, e.row);
if(e.deleteRegion) {
server.delete(regionName, clientId, lockid, COL_REGIONINFO);
} else if(e.regionOffline) {
e.info.offLine = true;
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteValue);
e.info.write(s);
server.put(regionName, clientId, lockid, COL_REGIONINFO,
new BytesWritable(byteValue.toByteArray()));
byteValue.toByteArray());
}
server.delete(regionName, clientId, lockid, COL_SERVER);
server.delete(regionName, clientId, lockid, COL_STARTCODE);
@ -1363,7 +1359,7 @@ public class HMaster implements HConstants, HMasterInterface,
HRegionInfo regionInfo = e.getValue();
unassignedRegions.put(region, regionInfo);
assignAttempts.put(region, 0L);
assignAttempts.put(region, Long.valueOf(0L));
}
}
@ -1384,7 +1380,8 @@ public class HMaster implements HConstants, HMasterInterface,
rootRegionLocation = null;
unassignedRegions.put(HGlobals.rootRegionInfo.regionName,
HGlobals.rootRegionInfo);
assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
assignAttempts.put(HGlobals.rootRegionInfo.regionName,
Long.valueOf(0L));
}
// Scan the ROOT region
@ -1525,7 +1522,7 @@ public class HMaster implements HConstants, HMasterInterface,
regionInfo.write(s);
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
new BytesWritable(byteValue.toByteArray()));
byteValue.toByteArray());
}
server.delete(metaRegionName, clientId, lockid, COL_SERVER);
server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
@ -1546,7 +1543,7 @@ public class HMaster implements HConstants, HMasterInterface,
}
unassignedRegions.put(regionInfo.regionName, regionInfo);
assignAttempts.put(regionInfo.regionName, 0L);
assignAttempts.put(regionInfo.regionName, Long.valueOf(0L));
} else if(deleteRegion) {
try {
@ -1569,36 +1566,27 @@ public class HMaster implements HConstants, HMasterInterface,
private class PendingOpenReport extends PendingOperation {
private boolean rootRegion;
private Text regionName;
private BytesWritable serverAddress;
private BytesWritable startCode;
private byte [] serverAddress;
private byte [] startCode;
PendingOpenReport(HServerInfo info, HRegionInfo region) {
if(region.tableDesc.getName().equals(META_TABLE_NAME)) {
if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
// The region which just came on-line is a META region.
// We need to look in the ROOT region for its information.
this.rootRegion = true;
} else {
// Just an ordinary region. Look for it in the META table.
this.rootRegion = false;
}
this.regionName = region.regionName;
try {
this.serverAddress = new BytesWritable(
info.getServerAddress().toString().getBytes(UTF8_ENCODING));
this.startCode = new BytesWritable(
String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
this.serverAddress = info.getServerAddress().toString().
getBytes(UTF8_ENCODING);
this.startCode = String.valueOf(info.getStartCode()).
getBytes(UTF8_ENCODING);
} catch(UnsupportedEncodingException e) {
LOG.error(e);
}
}
@Override
@ -1614,7 +1602,7 @@ public class HMaster implements HConstants, HMasterInterface,
if(LOG.isDebugEnabled()) {
LOG.debug(regionName + " open on "
+ new String(serverAddress.get(), UTF8_ENCODING));
+ new String(this.serverAddress, UTF8_ENCODING));
}
// Register the newly-available Region's location.
@ -1708,33 +1696,25 @@ public class HMaster implements HConstants, HMasterInterface,
}
// 1. Check to see if table already exists
MetaRegion m = null;
if(knownMetaRegions.containsKey(newRegion.regionName)) {
m = knownMetaRegions.get(newRegion.regionName);
} else {
m = knownMetaRegions.get(
MetaRegion m = (knownMetaRegions.containsKey(newRegion.regionName))?
knownMetaRegions.get(newRegion.regionName):
knownMetaRegions.get(
knownMetaRegions.headMap(newRegion.regionName).lastKey());
}
Text metaRegionName = m.regionName;
HRegionInterface server = client.getHRegionConnection(m.server);
BytesWritable bytes = server.get(metaRegionName, desc.getName(), COL_REGIONINFO);
if(bytes != null && bytes.getSize() != 0) {
byte[] infoBytes = bytes.get();
byte [] infoBytes =
server.get(metaRegionName, desc.getName(), COL_REGIONINFO);
if (infoBytes != null && infoBytes.length != 0) {
DataInputBuffer inbuf = new DataInputBuffer();
inbuf.reset(infoBytes, infoBytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(inbuf);
if(info.tableDesc.getName().compareTo(desc.getName()) == 0) {
if (info.tableDesc.getName().compareTo(desc.getName()) == 0) {
throw new IOException("table already exists");
}
}
// 2. Create the HRegion
HRegion r = HRegion.createHRegion(newRegion.regionId, desc, this.dir,
this.conf);
@ -1748,8 +1728,8 @@ public class HMaster implements HConstants, HMasterInterface,
long clientId = rand.nextLong();
long lockid = server.startUpdate(metaRegionName, clientId, regionName);
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
new BytesWritable(byteValue.toByteArray()));
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
byteValue.toByteArray());
server.commit(metaRegionName, clientId, lockid);
// 4. Close the new region to flush it to disk
@ -1759,7 +1739,7 @@ public class HMaster implements HConstants, HMasterInterface,
// 5. Get it assigned to a server
unassignedRegions.put(regionName, info);
assignAttempts.put(regionName, 0L);
assignAttempts.put(regionName, Long.valueOf(0L));
break;
} catch(NotServingRegionException e) {
@ -1887,30 +1867,26 @@ public class HMaster implements HConstants, HMasterInterface,
}
boolean haveRegionInfo = false;
for(int i = 0; i < values.length; i++) {
bytes = new byte[values[i].getData().getSize()];
if(bytes.length == 0) {
if(values[i].getData().length == 0) {
break;
}
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
Text column = values[i].getKey().getColumn();
if(column.equals(COL_REGIONINFO)) {
haveRegionInfo = true;
inbuf.reset(bytes, bytes.length);
inbuf.reset(values[i].getData(),
values[i].getData().length);
info.readFields(inbuf);
} else if(column.equals(COL_SERVER)) {
try {
serverName = new String(bytes, UTF8_ENCODING);
serverName =
new String(values[i].getData(), UTF8_ENCODING);
} catch(UnsupportedEncodingException e) {
assert(false);
}
} else if(column.equals(COL_STARTCODE)) {
try {
startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
startCode = Long.valueOf(new String(values[i].getData(),
UTF8_ENCODING)).longValue();
} catch(UnsupportedEncodingException e) {
assert(false);
}
@ -2115,8 +2091,7 @@ public class HMaster implements HConstants, HMasterInterface,
i.write(s);
server.put(regionName, clientId, lockid, COL_REGIONINFO,
new BytesWritable(byteValue.toByteArray()));
byteValue.toByteArray());
}
}
@ -2180,29 +2155,24 @@ public class HMaster implements HConstants, HMasterInterface,
}
protected void updateRegionInfo(HRegionInterface server, Text regionName,
HRegionInfo i) throws IOException {
HRegionInfo i)
throws IOException {
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteValue);
i.write(s);
long lockid = -1L;
long clientId = rand.nextLong();
try {
lockid = server.startUpdate(regionName, clientId, i.regionName);
server.put(regionName, clientId, lockid, COL_REGIONINFO,
new BytesWritable(byteValue.toByteArray()));
byteValue.toByteArray());
server.commit(regionName, clientId, lockid);
lockid = -1L;
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
} catch(NotServingRegionException e) {
throw e;
} catch(IOException e) {
LOG.error("column update failed in row: " + i.regionName);
LOG.error(e);

View File

@ -57,4 +57,4 @@ public interface HMasterInterface extends VersionedProtocol {
//////////////////////////////////////////////////////////////////////////////
public HServerAddress findRootRegion();
}
}

View File

@ -25,5 +25,6 @@ import org.apache.hadoop.ipc.VersionedProtocol;
public interface HMasterRegionInterface extends VersionedProtocol {
public static final long versionID = 1L;
public void regionServerStartup(HServerInfo info) throws IOException;
public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[]) throws IOException;
public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[])
throws IOException;
}

View File

@ -16,7 +16,9 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@ -24,7 +26,7 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
/**
@ -34,13 +36,13 @@ import org.apache.hadoop.io.Text;
public class HMemcache {
private final Log LOG = LogFactory.getLog(this.getClass().getName());
TreeMap<HStoreKey, BytesWritable> memcache
= new TreeMap<HStoreKey, BytesWritable>();
TreeMap<HStoreKey, byte []> memcache =
new TreeMap<HStoreKey, byte []>();
Vector<TreeMap<HStoreKey, BytesWritable>> history
= new Vector<TreeMap<HStoreKey, BytesWritable>>();
Vector<TreeMap<HStoreKey, byte []>> history
= new Vector<TreeMap<HStoreKey, byte []>>();
TreeMap<HStoreKey, BytesWritable> snapshot = null;
TreeMap<HStoreKey, byte []> snapshot = null;
final HLocking lock = new HLocking();
@ -49,7 +51,7 @@ public class HMemcache {
}
public static class Snapshot {
public TreeMap<HStoreKey, BytesWritable> memcacheSnapshot = null;
public TreeMap<HStoreKey, byte []> memcacheSnapshot = null;
public long sequenceId = 0;
public Snapshot() {
@ -92,7 +94,7 @@ public class HMemcache {
retval.memcacheSnapshot = memcache;
this.snapshot = memcache;
history.add(memcache);
memcache = new TreeMap<HStoreKey, BytesWritable>();
memcache = new TreeMap<HStoreKey, byte []>();
retval.sequenceId = log.startCacheFlush();
if(LOG.isDebugEnabled()) {
@ -122,21 +124,18 @@ public class HMemcache {
LOG.debug("deleting snapshot");
}
for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator();
for(Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator();
it.hasNext(); ) {
TreeMap<HStoreKey, BytesWritable> cur = it.next();
if(snapshot == cur) {
TreeMap<HStoreKey, byte []> cur = it.next();
if (snapshot == cur) {
it.remove();
break;
}
}
this.snapshot = null;
if(LOG.isDebugEnabled()) {
LOG.debug("snapshot deleted");
}
} finally {
this.lock.releaseWriteLock();
}
@ -144,14 +143,16 @@ public class HMemcache {
/**
* Store a value.
*
* Operation uses a write lock.
* @param row
* @param columns
* @param timestamp
*/
public void add(final Text row, final TreeMap<Text, BytesWritable> columns,
public void add(final Text row, final TreeMap<Text, byte []> columns,
final long timestamp) {
this.lock.obtainWriteLock();
try {
for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) {
for (Map.Entry<Text, byte []> es: columns.entrySet()) {
HStoreKey key = new HStoreKey(row, es.getKey(), timestamp);
memcache.put(key, es.getValue());
}
@ -162,45 +163,47 @@ public class HMemcache {
/**
* Look back through all the backlog TreeMaps to find the target.
*
* We only need a readlock here.
* @param key
* @param numVersions
* @return An array of byte arrays orderded by timestamp.
*/
public BytesWritable[] get(HStoreKey key, int numVersions) {
Vector<BytesWritable> results = new Vector<BytesWritable>();
public byte [][] get(final HStoreKey key, final int numVersions) {
List<byte []> results = new ArrayList<byte[]>();
this.lock.obtainReadLock();
try {
Vector<BytesWritable> result = get(memcache, key, numVersions-results.size());
ArrayList<byte []> result =
get(memcache, key, numVersions - results.size());
results.addAll(0, result);
for(int i = history.size()-1; i >= 0; i--) {
if(numVersions > 0 && results.size() >= numVersions) {
for (int i = history.size() - 1; i >= 0; i--) {
if (numVersions > 0 && results.size() >= numVersions) {
break;
}
result = get(history.elementAt(i), key, numVersions-results.size());
result = get(history.elementAt(i), key, numVersions - results.size());
results.addAll(results.size(), result);
}
return (results.size() == 0)?
null: results.toArray(new BytesWritable[results.size()]);
null: ImmutableBytesWritable.toArray(results);
} finally {
this.lock.releaseReadLock();
}
}
/**
* Return all the available columns for the given key. The key indicates a
* row and timestamp, but not a column name.
*
* The returned object should map column names to byte arrays (byte[]).
* @param key
* @return All columns for given key.
*/
public TreeMap<Text, BytesWritable> getFull(HStoreKey key) {
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
public TreeMap<Text, byte []> getFull(HStoreKey key) {
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
this.lock.obtainReadLock();
try {
internalGetFull(memcache, key, results);
for(int i = history.size()-1; i >= 0; i--) {
TreeMap<HStoreKey, BytesWritable> cur = history.elementAt(i);
for (int i = history.size()-1; i >= 0; i--) {
TreeMap<HStoreKey, byte []> cur = history.elementAt(i);
internalGetFull(cur, key, results);
}
return results;
@ -210,17 +213,16 @@ public class HMemcache {
}
}
void internalGetFull(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key,
TreeMap<Text, BytesWritable> results) {
SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(key);
for (Map.Entry<HStoreKey, BytesWritable> es: tailMap.entrySet()) {
void internalGetFull(TreeMap<HStoreKey, byte []> map, HStoreKey key,
TreeMap<Text, byte []> results) {
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
Text itCol = itKey.getColumn();
if (results.get(itCol) == null
&& key.matchesWithoutColumn(itKey)) {
BytesWritable val = tailMap.get(itKey);
byte [] val = tailMap.get(itKey);
results.put(itCol, val);
} else if (key.getRow().compareTo(itKey.getRow()) > 0) {
break;
}
@ -235,18 +237,23 @@ public class HMemcache {
*
* TODO - This is kinda slow. We need a data structure that allows for
* proximity-searches, not just precise-matches.
*/
Vector<BytesWritable> get(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, int numVersions) {
Vector<BytesWritable> result = new Vector<BytesWritable>();
HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(curKey);
for (Map.Entry<HStoreKey, BytesWritable> es: tailMap.entrySet()) {
* @param map
* @param key
* @param numVersions
* @return Ordered list of items found in passed <code>map</code>
*/
ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
final HStoreKey key, final int numVersions) {
ArrayList<byte []> result = new ArrayList<byte []>();
HStoreKey curKey =
new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(curKey);
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
result.add(tailMap.get(itKey));
curKey.setVersion(itKey.getTimestamp() - 1);
}
if (numVersions > 0 && result.size() >= numVersions) {
break;
}
@ -269,8 +276,8 @@ public class HMemcache {
//////////////////////////////////////////////////////////////////////////////
class HMemcacheScanner extends HAbstractScanner {
TreeMap<HStoreKey, BytesWritable> backingMaps[];
Iterator<HStoreKey> keyIterators[];
final TreeMap<HStoreKey, byte []> backingMaps[];
final Iterator<HStoreKey> keyIterators[];
@SuppressWarnings("unchecked")
public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
@ -292,7 +299,7 @@ public class HMemcache {
this.keyIterators = new Iterator[backingMaps.length];
this.keys = new HStoreKey[backingMaps.length];
this.vals = new BytesWritable[backingMaps.length];
this.vals = new byte[backingMaps.length][];
// Generate list of iterators
HStoreKey firstKey = new HStoreKey(firstRow);

View File

@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
@ -39,7 +38,9 @@ class HMerge implements HConstants {
static final Log LOG = LogFactory.getLog(HMerge.class);
static final Text[] META_COLS = {COL_REGIONINFO};
private HMerge() {} // Not instantiable
private HMerge() {
// Not instantiable
}
/**
* Scans the table and merges two adjacent regions if they are small. This
@ -317,7 +318,7 @@ class HMerge implements HConstants {
private static class OfflineMerger extends Merger {
private Path dir;
private TreeSet<HRegionInfo> metaRegions;
private TreeMap<Text, BytesWritable> results;
private TreeMap<Text, byte []> results;
OfflineMerger(Configuration conf, FileSystem fs, Text tableName)
throws IOException {
@ -325,7 +326,7 @@ class HMerge implements HConstants {
super(conf, fs, tableName);
this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
this.metaRegions = new TreeSet<HRegionInfo>();
this.results = new TreeMap<Text, BytesWritable>();
this.results = new TreeMap<Text, byte []>();
// Scan root region to find all the meta regions
@ -337,10 +338,8 @@ class HMerge implements HConstants {
try {
while(rootScanner.next(key, results)) {
for(BytesWritable b: results.values()) {
byte[] bytes = new byte[b.getSize()];
System.arraycopy(b.get(), 0, bytes, 0, bytes.length);
in.reset(bytes, bytes.length);
for(byte [] b: results.values()) {
in.reset(b, b.length);
info.readFields(in);
metaRegions.add(info);
results.clear();
@ -405,8 +404,7 @@ class HMerge implements HConstants {
long lockid = -1L;
try {
lockid = root.startUpdate(newRegion.getRegionName());
root.put(lockid, COL_REGIONINFO,
new BytesWritable(byteValue.toByteArray()));
root.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
root.commit(lockid);
lockid = -1L;

View File

@ -238,11 +238,11 @@ class HRegion implements HConstants {
// Members
//////////////////////////////////////////////////////////////////////////////
TreeMap<Text, Long> rowsToLocks = new TreeMap<Text, Long>();
TreeMap<Long, Text> locksToRows = new TreeMap<Long, Text>();
TreeMap<Text, HStore> stores = new TreeMap<Text, HStore>();
Map<Long, TreeMap<Text, BytesWritable>> targetColumns
= new HashMap<Long, TreeMap<Text, BytesWritable>>();
Map<Text, Long> rowsToLocks = new HashMap<Text, Long>();
Map<Long, Text> locksToRows = new HashMap<Long, Text>();
Map<Text, HStore> stores = new HashMap<Text, HStore>();
Map<Long, TreeMap<Text, byte []>> targetColumns
= new HashMap<Long, TreeMap<Text, byte []>>();
HMemcache memcache;
@ -826,7 +826,7 @@ class HRegion implements HConstants {
}
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
TreeMap<HStoreKey, BytesWritable> memcacheSnapshot = retval.memcacheSnapshot;
TreeMap<HStoreKey, byte []> memcacheSnapshot = retval.memcacheSnapshot;
if(memcacheSnapshot == null) {
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
@ -885,31 +885,28 @@ class HRegion implements HConstants {
//////////////////////////////////////////////////////////////////////////////
/** Fetch a single data item. */
BytesWritable get(Text row, Text column) throws IOException {
BytesWritable[] results = get(row, column, Long.MAX_VALUE, 1);
return (results == null)? null: results[0];
byte [] get(Text row, Text column) throws IOException {
byte [][] results = get(row, column, Long.MAX_VALUE, 1);
return (results == null || results.length == 0)? null: results[0];
}
/** Fetch multiple versions of a single data item */
BytesWritable[] get(Text row, Text column, int numVersions) throws IOException {
byte [][] get(Text row, Text column, int numVersions) throws IOException {
return get(row, column, Long.MAX_VALUE, numVersions);
}
/** Fetch multiple versions of a single data item, with timestamp. */
BytesWritable[] get(Text row, Text column, long timestamp, int numVersions)
throws IOException {
byte [][] get(Text row, Text column, long timestamp, int numVersions)
throws IOException {
if(writestate.closed) {
throw new IOException("HRegion is closed.");
}
// Make sure this is a valid row and valid column
checkRow(row);
checkColumn(column);
// Obtain the row-lock
obtainRowLock(row);
try {
// Obtain the -col results
@ -921,13 +918,12 @@ class HRegion implements HConstants {
}
/** Private implementation: get the value for the indicated HStoreKey */
private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
private byte [][] get(HStoreKey key, int numVersions) throws IOException {
lock.obtainReadLock();
try {
// Check the memcache
BytesWritable[] result = memcache.get(key, numVersions);
byte [][] result = memcache.get(key, numVersions);
if(result != null) {
return result;
}
@ -957,19 +953,17 @@ class HRegion implements HConstants {
* determine which column groups are useful for that row. That would let us
* avoid a bunch of disk activity.
*/
TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
TreeMap<Text, byte []> getFull(Text row) throws IOException {
HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
lock.obtainReadLock();
try {
TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
Text colFamily = it.next();
TreeMap<Text, byte []> memResult = memcache.getFull(key);
for (Text colFamily: stores.keySet()) {
HStore targetStore = stores.get(colFamily);
targetStore.getFull(key, memResult);
}
return memResult;
} finally {
lock.releaseReadLock();
}
@ -1035,9 +1029,8 @@ class HRegion implements HConstants {
* This method really just tests the input, then calls an internal localput()
* method.
*/
void put(long lockid, Text targetCol, BytesWritable val) throws IOException {
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
void put(long lockid, Text targetCol, byte [] val) throws IOException {
if (DELETE_BYTES.compareTo(val) == 0) {
throw new IOException("Cannot insert value: " + val);
}
localput(lockid, targetCol, val);
@ -1047,7 +1040,7 @@ class HRegion implements HConstants {
* Delete a value or write a value. This is a just a convenience method for put().
*/
void delete(long lockid, Text targetCol) throws IOException {
localput(lockid, targetCol, DELETE_BYTES);
localput(lockid, targetCol, DELETE_BYTES.get());
}
/**
@ -1063,7 +1056,7 @@ class HRegion implements HConstants {
* @throws IOException
*/
void localput(final long lockid, final Text targetCol,
final BytesWritable val)
final byte [] val)
throws IOException {
checkColumn(targetCol);
@ -1083,9 +1076,9 @@ class HRegion implements HConstants {
lockid + " unexpected aborted by another thread");
}
TreeMap<Text, BytesWritable> targets = this.targetColumns.get(lockid);
TreeMap<Text, byte []> targets = this.targetColumns.get(lockid);
if (targets == null) {
targets = new TreeMap<Text, BytesWritable>();
targets = new TreeMap<Text, byte []>();
this.targetColumns.put(lockid, targets);
}
targets.put(targetCol, val);
@ -1144,8 +1137,7 @@ class HRegion implements HConstants {
synchronized(row) {
// Add updates to the log and add values to the memcache.
long commitTimestamp = System.currentTimeMillis();
TreeMap<Text, BytesWritable> columns =
this.targetColumns.get(lockid);
TreeMap<Text, byte []> columns = this.targetColumns.get(lockid);
if (columns != null && columns.size() > 0) {
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
row, columns, commitTimestamp);
@ -1267,7 +1259,7 @@ class HRegion implements HConstants {
*/
private static class HScanner implements HInternalScannerInterface {
private HInternalScannerInterface[] scanners;
private TreeMap<Text, BytesWritable>[] resultSets;
private TreeMap<Text, byte []>[] resultSets;
private HStoreKey[] keys;
private boolean wildcardMatch;
private boolean multipleMatchers;
@ -1323,7 +1315,7 @@ class HRegion implements HConstants {
}
for(int i = 0; i < scanners.length; i++) {
keys[i] = new HStoreKey();
resultSets[i] = new TreeMap<Text, BytesWritable>();
resultSets[i] = new TreeMap<Text, byte []>();
if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
closeScanner(i);
}
@ -1351,7 +1343,7 @@ class HRegion implements HConstants {
*
* @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
*/
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results)
public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
throws IOException {
// Find the lowest-possible key.
Text chosenRow = null;
@ -1393,7 +1385,7 @@ class HRegion implements HConstants {
// values with older ones. So now we only insert
// a result if the map does not contain the key.
for(Map.Entry<Text, BytesWritable> e: resultSets[i].entrySet()) {
for(Map.Entry<Text, byte []> e: resultSets[i].entrySet()) {
if(!results.containsKey(e.getKey())) {
results.put(e.getKey(), e.getValue());
insertedItem = true;
@ -1504,7 +1496,7 @@ class HRegion implements HConstants {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(bytes);
r.getRegionInfo().write(s);
meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
meta.put(writeid, COL_REGIONINFO, bytes.toByteArray());
meta.commit(writeid);
}

View File

@ -15,12 +15,11 @@
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.io.BytesWritable;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.*;
/*******************************************************************************
* Clients interact with HRegionServers using
* a handle to the HRegionInterface.
@ -36,57 +35,62 @@ public interface HRegionInterface extends VersionedProtocol {
* @return - HRegionInfo object for region
* @throws NotServingRegionException
*/
public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException;
public HRegionInfo getRegionInfo(final Text regionName)
throws NotServingRegionException;
/**
* Retrieve a single value from the specified region for the specified row
* and column keys
*
* @param regionName - name of region
* @param row - row key
* @param column - column key
* @return - value for that region/row/column
* @param regionName name of region
* @param row row key
* @param column column key
* @return alue for that region/row/column
* @throws IOException
*/
public BytesWritable get(final Text regionName, final Text row, final Text column) throws IOException;
public byte [] get(final Text regionName, final Text row, final Text column)
throws IOException;
/**
* Get the specified number of versions of the specified row and column
*
* @param regionName - region name
* @param row - row key
* @param column - column key
* @param numVersions - number of versions to return
* @return - array of values
* @param regionName region name
* @param row row key
* @param column column key
* @param numVersions number of versions to return
* @return array of values
* @throws IOException
*/
public BytesWritable[] get(final Text regionName, final Text row,
final Text column, final int numVersions) throws IOException;
public byte [][] get(final Text regionName, final Text row,
final Text column, final int numVersions)
throws IOException;
/**
* Get the specified number of versions of the specified row and column with
* the specified timestamp.
*
* @param regionName - region name
* @param row - row key
* @param column - column key
* @param timestamp - timestamp
* @param numVersions - number of versions to return
* @return - array of values
* @param regionName region name
* @param row row key
* @param column column key
* @param timestamp timestamp
* @param numVersions number of versions to return
* @return array of values
* @throws IOException
*/
public BytesWritable[] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException;
public byte [][] get(final Text regionName, final Text row,
final Text column, final long timestamp, final int numVersions)
throws IOException;
/**
* Get all the data for the specified row
*
* @param regionName - region name
* @param row - row key
* @return - array of values
* @param regionName region name
* @param row row key
* @return array of values
* @throws IOException
*/
public KeyedData[] getRow(final Text regionName, final Text row) throws IOException;
public KeyedData[] getRow(final Text regionName, final Text row)
throws IOException;
//////////////////////////////////////////////////////////////////////////////
// Start an atomic row insertion/update. No changes are committed until the
@ -110,67 +114,72 @@ public interface HRegionInterface extends VersionedProtocol {
* The client can gain extra time with a call to renewLease().
* Start an atomic row insertion or update
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param row - Name of row to start update against.
* @param regionName region name
* @param clientid a unique value to identify the client
* @param row Name of row to start update against.
* @return Row lockid.
* @throws IOException
*/
public long startUpdate(final Text regionName, final long clientid,
final Text row) throws IOException;
final Text row)
throws IOException;
/**
* Change a value for the specified column
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @param column - column whose value is being set
* @param val - new value for column
* @param regionName region name
* @param clientid a unique value to identify the client
* @param lockid lock id returned from startUpdate
* @param column column whose value is being set
* @param val new value for column
* @throws IOException
*/
public void put(final Text regionName, final long clientid, final long lockid,
final Text column, final BytesWritable val) throws IOException;
final Text column, final byte [] val)
throws IOException;
/**
* Delete the value for a column
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
* @param regionName region name
* @param clientid a unique value to identify the client
* @param lockid lock id returned from startUpdate
* @param column name of column whose value is to be deleted
* @throws IOException
*/
public void delete(final Text regionName, final long clientid, final long lockid,
final Text column) throws IOException;
public void delete(final Text regionName, final long clientid,
final long lockid, final Text column)
throws IOException;
/**
* Abort a row mutation
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @param regionName region name
* @param clientid a unique value to identify the client
* @param lockid lock id returned from startUpdate
* @throws IOException
*/
public void abort(final Text regionName, final long clientid,
final long lockid) throws IOException;
final long lockid)
throws IOException;
/**
* Finalize a row mutation
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @param regionName region name
* @param clientid a unique value to identify the client
* @param lockid lock id returned from startUpdate
* @throws IOException
*/
public void commit(final Text regionName, final long clientid,
final long lockid) throws IOException;
final long lockid)
throws IOException;
/**
* Renew lease on update
*
* @param lockid - lock id returned from startUpdate
* @param clientid - a unique value to identify the client
* @param lockid lock id returned from startUpdate
* @param clientid a unique value to identify the client
* @throws IOException
*/
public void renewLease(long lockid, long clientid) throws IOException;
@ -182,20 +191,21 @@ public interface HRegionInterface extends VersionedProtocol {
/**
* Opens a remote scanner.
*
* @param regionName - name of region to scan
* @param columns - columns to scan
* @param startRow - starting row to scan
* @param regionName name of region to scan
* @param columns columns to scan
* @param startRow starting row to scan
*
* @return scannerId - scanner identifier used in other calls
* @return scannerId scanner identifier used in other calls
* @throws IOException
*/
public long openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
public long openScanner(Text regionName, Text[] columns, Text startRow)
throws IOException;
/**
* Get the next set of values
*
* @param scannerId - clientId passed to openScanner
* @return - array of values
* @param scannerId clientId passed to openScanner
* @return array of values
* @throws IOException
*/
public KeyedData[] next(long scannerId) throws IOException;
@ -203,7 +213,7 @@ public interface HRegionInterface extends VersionedProtocol {
/**
* Close a scanner
*
* @param scannerId - the scanner id returned by openScanner
* @param scannerId the scanner id returned by openScanner
* @throws IOException
*/
public void close(long scannerId) throws IOException;

View File

@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
@ -396,8 +395,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
this.outboundMsgs = new Vector<HMsg>();
this.scanners =
Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>());
// Config'ed params
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
@ -914,27 +911,26 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
*/
public BytesWritable get(final Text regionName, final Text row,
final Text column) throws IOException {
public byte [] get(final Text regionName, final Text row,
final Text column)
throws IOException {
return getRegion(regionName).get(row, column);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, int)
*/
public BytesWritable[] get(final Text regionName, final Text row,
final Text column, final int numVersions) throws IOException {
public byte [][] get(final Text regionName, final Text row,
final Text column, final int numVersions)
throws IOException {
return getRegion(regionName).get(row, column, numVersions);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, long, int)
*/
public BytesWritable[] get(final Text regionName, final Text row, final Text column,
public byte [][] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException {
return getRegion(regionName).get(row, column, timestamp, numVersions);
}
@ -943,10 +939,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/
public KeyedData[] getRow(final Text regionName, final Text row) throws IOException {
HRegion region = getRegion(regionName);
TreeMap<Text, BytesWritable> map = region.getFull(row);
TreeMap<Text, byte[]> map = region.getFull(row);
KeyedData result[] = new KeyedData[map.size()];
int counter = 0;
for (Map.Entry<Text, BytesWritable> es: map.entrySet()) {
for (Map.Entry<Text, byte []> es: map.entrySet()) {
result[counter++] =
new KeyedData(new HStoreKey(row, es.getKey()), es.getValue());
}
@ -957,30 +953,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @see org.apache.hadoop.hbase.HRegionInterface#next(long)
*/
public KeyedData[] next(final long scannerId)
throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
throws IOException {
String scannerName = String.valueOf(scannerId);
HInternalScannerInterface s = scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
leases.renewLease(scannerName, scannerName);
leases.renewLease(scannerId, scannerId);
// Collect values to be returned here
ArrayList<KeyedData> values = new ArrayList<KeyedData>();
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
// Keep getting rows until we find one that has at least one non-deleted column value
HStoreKey key = new HStoreKey();
while (s.next(key, results)) {
for(Map.Entry<Text, BytesWritable> e: results.entrySet()) {
for(Map.Entry<Text, byte []> e: results.entrySet()) {
HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
BytesWritable val = e.getValue();
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
byte [] val = e.getValue();
if (DELETE_BYTES.compareTo(val) == 0) {
// Column value is deleted. Don't return it.
if (LOG.isDebugEnabled()) {
LOG.debug("skipping deleted value for key: " + k.toString());
@ -1011,10 +1005,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throws IOException {
HRegion region = getRegion(regionName);
long lockid = region.startUpdate(row);
this.leases.createLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)),
new RegionListener(region, lockid));
this.leases.createLease(clientid, lockid,
new RegionListener(region, lockid));
return lockid;
}
@ -1041,11 +1033,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#put(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text, org.apache.hadoop.io.BytesWritable)
*/
public void put(Text regionName, long clientid, long lockid, Text column,
BytesWritable val) throws IOException {
public void put(final Text regionName, final long clientid,
final long lockid, final Text column, final byte [] val)
throws IOException {
HRegion region = getRegion(regionName, true);
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
leases.renewLease(clientid, lockid);
region.put(lockid, column, val);
}
@ -1053,10 +1045,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @see org.apache.hadoop.hbase.HRegionInterface#delete(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text)
*/
public void delete(Text regionName, long clientid, long lockid, Text column)
throws IOException {
throws IOException {
HRegion region = getRegion(regionName);
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
leases.renewLease(clientid, lockid);
region.delete(lockid, column);
}
@ -1064,10 +1055,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @see org.apache.hadoop.hbase.HRegionInterface#abort(org.apache.hadoop.io.Text, long, long)
*/
public void abort(Text regionName, long clientid, long lockid)
throws IOException {
throws IOException {
HRegion region = getRegion(regionName, true);
leases.cancelLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
leases.cancelLease(clientid, lockid);
region.abort(lockid);
}
@ -1077,8 +1067,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
public void commit(Text regionName, long clientid, long lockid)
throws IOException {
HRegion region = getRegion(regionName, true);
leases.cancelLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
leases.cancelLease(clientid, lockid);
region.commit(lockid);
}
@ -1086,8 +1075,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @see org.apache.hadoop.hbase.HRegionInterface#renewLease(long, long)
*/
public void renewLease(long lockid, long clientid) throws IOException {
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
leases.renewLease(clientid, lockid);
}
/**
@ -1139,29 +1127,31 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// remote scanner interface
//////////////////////////////////////////////////////////////////////////////
Map<Text, HInternalScannerInterface> scanners;
Map<String, HInternalScannerInterface> scanners =
Collections.synchronizedMap(new HashMap<String,
HInternalScannerInterface>());
/**
* Instantiated as a scanner lease.
* If the lease times out, the scanner is closed
*/
private class ScannerListener implements LeaseListener {
private Text scannerName;
private final String scannerName;
ScannerListener(Text scannerName) {
this.scannerName = scannerName;
ScannerListener(final String n) {
this.scannerName = n;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.LeaseListener#leaseExpired()
*/
public void leaseExpired() {
LOG.info("Scanner " + scannerName + " lease expired");
LOG.info("Scanner " + this.scannerName + " lease expired");
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
s = scanners.remove(this.scannerName);
}
if(s != null) {
if (s != null) {
s.close();
}
}
@ -1177,11 +1167,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try {
HInternalScannerInterface s = r.getScanner(cols, firstRow);
scannerId = rand.nextLong();
Text scannerName = new Text(String.valueOf(scannerId));
String scannerName = String.valueOf(scannerId);
synchronized(scanners) {
scanners.put(scannerName, s);
}
leases.createLease(scannerName, scannerName,
leases.createLease(scannerId, scannerId,
new ScannerListener(scannerName));
} catch(IOException e) {
LOG.error(e);
@ -1193,8 +1183,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#close(long)
*/
public void close(long scannerId) throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
public void close(final long scannerId) throws IOException {
String scannerName = String.valueOf(scannerId);
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
@ -1203,7 +1193,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throw new UnknownScannerException(scannerName.toString());
}
s.close();
leases.cancelLease(scannerName, scannerName);
leases.cancelLease(scannerId, scannerId);
}
private static void printUsageAndExit() {

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/**
@ -168,7 +167,7 @@ class HRegiondirReader {
Text [] families = info.tableDesc.families().keySet().toArray(new Text [] {});
HInternalScannerInterface scanner = r.getScanner(families, new Text());
HStoreKey key = new HStoreKey();
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
// Print out table header line.
String s = info.startKey.toString();
String startKey = (s == null || s.length() <= 0)? "<>": s;
@ -184,19 +183,15 @@ class HRegiondirReader {
// Every line starts with row name followed by column name
// followed by cell content.
while(scanner.next(key, results)) {
for (Map.Entry<Text, BytesWritable> es: results.entrySet()) {
Text colname = es.getKey();
BytesWritable colvalue = es.getValue();
for (Map.Entry<Text, byte []> es: results.entrySet()) {
Text colname = es.getKey();
byte [] colvalue = es.getValue();
Object value = null;
byte[] bytes = new byte[colvalue.getSize()];
if (colname.toString().equals("info:regioninfo")) {
// Then bytes are instance of an HRegionInfo.
System.arraycopy(colvalue, 0, bytes, 0, bytes.length);
value = new HRegionInfo(bytes);
value = new HRegionInfo(colvalue);
} else {
value = new String(bytes, HConstants.UTF8_ENCODING);
value = new String(colvalue, HConstants.UTF8_ENCODING);
}
System.out.println(" " + key + ", " + colname.toString() + ": \"" +
value.toString() + "\"");
}

View File

@ -24,6 +24,7 @@ import java.util.*;
* HScannerInterface iterates through a set of rows. It's implemented by several classes.
******************************************************************************/
public interface HScannerInterface {
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException;
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results)
throws IOException;
public void close() throws IOException;
}

View File

@ -45,7 +45,8 @@ public class HServerAddress implements Writable {
throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort);
}
String host = hostAndPort.substring(0, colonIndex);
int port = Integer.valueOf(hostAndPort.substring(colonIndex + 1));
int port =
Integer.valueOf(hostAndPort.substring(colonIndex + 1)).intValue();
this.address = new InetSocketAddress(host, port);
this.stringValue = hostAndPort;
}

View File

@ -18,7 +18,10 @@ package org.apache.hadoop.hbase;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
@ -31,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@ -40,14 +43,14 @@ import org.apache.hadoop.io.WritableComparable;
import org.onelab.filter.*;
/*******************************************************************************
/**
* HStore maintains a bunch of data files. It is responsible for maintaining
* the memory/file hierarchy and for periodic flushes to disk and compacting
* edits to the file.
*
* Locking and transactions are handled at a higher level. This API should not
* be called directly by any writer, but rather by an HRegion manager.
******************************************************************************/
*/
class HStore implements HConstants {
private static final Log LOG = LogFactory.getLog(HStore.class);
@ -71,8 +74,8 @@ class HStore implements HConstants {
Path filterDir;
Filter bloomFilter;
Integer compactLock = 0;
Integer flushLock = 0;
Integer compactLock = new Integer(0);
Integer flushLock = new Integer(0);
final HLocking lock = new HLocking();
@ -81,10 +84,6 @@ class HStore implements HConstants {
Random rand = new Random();
//////////////////////////////////////////////////////////////////////////////
// Constructors, destructors, etc
//////////////////////////////////////////////////////////////////////////////
/**
* An HStore is a set of zero or more MapFiles, which stretch backwards over
* time. A given HStore is responsible for a certain set of columns for a
@ -109,12 +108,12 @@ class HStore implements HConstants {
* <p>It's assumed that after this constructor returns, the reconstructionLog
* file will be deleted (by whoever has instantiated the HStore).
*
* @param dir - log file directory
* @param regionName - name of region
* @param family - name of column family
* @param fs - file system object
* @param reconstructionLog - existing log file to apply if any
* @param conf - configuration object
* @param dir log file directory
* @param regionName name of region
* @param family name of column family
* @param fs file system object
* @param reconstructionLog existing log file to apply if any
* @param conf configuration object
* @throws IOException
*/
HStore(Path dir, Text regionName, HColumnDescriptor family,
@ -178,9 +177,8 @@ class HStore implements HConstants {
// file, the entry in 'mapdir' must be deleted.
Vector<HStoreFile> hstoreFiles
= HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
mapFiles.put(hsf.loadInfo(fs), hsf);
for(HStoreFile hsf: hstoreFiles) {
mapFiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
}
// Now go through all the HSTORE_LOGINFOFILEs and figure out the
@ -192,8 +190,7 @@ class HStore implements HConstants {
// means it was built prior to the previous run of HStore, and so it cannot
// contain any updates also contained in the log.
long maxSeqID = -1;
for (Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
for (HStoreFile hsf: hstoreFiles) {
long seqid = hsf.loadInfo(fs);
if(seqid > 0) {
if(seqid > maxSeqID) {
@ -202,68 +199,8 @@ class HStore implements HConstants {
}
}
// If a bloom filter is enabled, try to read it in.
// If it doesn't exist, create it.
doReconstructionLog(reconstructionLog, maxSeqID);
// Read the reconstructionLog to see whether we need to build a brand-new
// MapFile out of non-flushed log entries.
//
// We can ignore any log message that has a sequence ID that's equal to or
// lower than maxSeqID. (Because we know such log messages are already
// reflected in the MapFiles.)
if(LOG.isDebugEnabled()) {
LOG.debug("reading reconstructionLog");
}
if(reconstructionLog != null && fs.exists(reconstructionLog)) {
long maxSeqIdInLog = -1;
TreeMap<HStoreKey, BytesWritable> reconstructedCache
= new TreeMap<HStoreKey, BytesWritable>();
SequenceFile.Reader login
= new SequenceFile.Reader(fs, reconstructionLog, conf);
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
while(login.next(key, val)) {
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
if (key.getLogSeqNum() <= maxSeqID) {
continue;
}
// Check this edit is for me. Also, guard against writing
// METACOLUMN info such as HBASE::CACHEFLUSH entries
Text column = val.getColumn();
if (column.equals(HLog.METACOLUMN)
|| !key.getRegionName().equals(this.regionName)
|| !HStoreKey.extractFamily(column).equals(this.familyName)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Passing on edit " + key.getRegionName() + ", "
+ column.toString() + ": " + new String(val.getVal().get())
+ ", my region: " + this.regionName + ", my column: "
+ this.familyName);
}
continue;
}
byte[] bytes = new byte[val.getVal().getSize()];
System.arraycopy(val.getVal().get(), 0, bytes, 0, bytes.length);
HStoreKey k = new HStoreKey(key.getRow(), column,val.getTimestamp());
if (LOG.isDebugEnabled()) {
LOG.debug("Applying edit " + k.toString() + "="
+ new String(bytes, UTF8_ENCODING));
}
reconstructedCache.put(k, new BytesWritable(bytes));
}
} finally {
login.close();
}
if(reconstructedCache.size() > 0) {
// We create a "virtual flush" at maxSeqIdInLog+1.
if(LOG.isDebugEnabled()) {
LOG.debug("flushing reconstructionCache");
}
flushCacheHelper(reconstructedCache, maxSeqIdInLog+1, true);
}
}
// Compact all the MapFiles into a single file. The resulting MapFile
// should be "timeless"; that is, it should not have an associated seq-ID,
// because all log messages have been reflected in the TreeMaps at this
@ -286,6 +223,70 @@ class HStore implements HConstants {
LOG.info("HStore online for " + this.regionName + "/" + this.familyName);
}
/*
* Read the reconstructionLog to see whether we need to build a brand-new
* MapFile out of non-flushed log entries.
*
* We can ignore any log message that has a sequence ID that's equal to or
* lower than maxSeqID. (Because we know such log messages are already
* reflected in the MapFiles.)
*/
private void doReconstructionLog(final Path reconstructionLog,
final long maxSeqID)
throws UnsupportedEncodingException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("reading reconstructionLog");
}
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
return;
}
long maxSeqIdInLog = -1;
TreeMap<HStoreKey, byte []> reconstructedCache =
new TreeMap<HStoreKey, byte []>();
SequenceFile.Reader login =
new SequenceFile.Reader(this.fs, reconstructionLog, this.conf);
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
while (login.next(key, val)) {
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
if (key.getLogSeqNum() <= maxSeqID) {
continue;
}
// Check this edit is for me. Also, guard against writing
// METACOLUMN info such as HBASE::CACHEFLUSH entries
Text column = val.getColumn();
if (column.equals(HLog.METACOLUMN)
|| !key.getRegionName().equals(this.regionName)
|| !HStoreKey.extractFamily(column).equals(this.familyName)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Passing on edit " + key.getRegionName() + ", "
+ column.toString() + ": " + new String(val.getVal())
+ ", my region: " + this.regionName + ", my column: "
+ this.familyName);
}
continue;
}
HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
if (LOG.isDebugEnabled()) {
LOG.debug("Applying edit " + k.toString() + "=" +
new String(val.getVal(), UTF8_ENCODING));
}
reconstructedCache.put(k, val.getVal());
}
} finally {
login.close();
}
if (reconstructedCache.size() > 0) {
// We create a "virtual flush" at maxSeqIdInLog+1.
if (LOG.isDebugEnabled()) {
LOG.debug("flushing reconstructionCache");
}
flushCacheHelper(reconstructedCache, maxSeqIdInLog + 1, true);
}
}
//////////////////////////////////////////////////////////////////////////////
// Bloom filters
//////////////////////////////////////////////////////////////////////////////
@ -423,15 +424,20 @@ class HStore implements HConstants {
/**
* Get a MapFile writer
* This allows us to substitute a BloomFilterWriter if a bloom filter is enabled
* This allows us to substitute a BloomFilterWriter if a bloom filter is
* enabled
*
* @param dirName Directory with store files.
* @return Map file.
* @throws IOException
*/
MapFile.Writer getMapFileWriter(String dirName) throws IOException {
if(bloomFilter != null) {
if (bloomFilter != null) {
return new BloomFilterWriter(conf, fs, dirName, HStoreKey.class,
BytesWritable.class, compression);
ImmutableBytesWritable.class, compression);
}
return new MapFile.Writer(conf, fs, dirName, HStoreKey.class,
BytesWritable.class, compression);
ImmutableBytesWritable.class, compression);
}
//////////////////////////////////////////////////////////////////////////////
@ -440,6 +446,7 @@ class HStore implements HConstants {
/**
* Turn off all the MapFile readers
*
* @throws IOException
*/
void close() throws IOException {
@ -478,14 +485,15 @@ class HStore implements HConstants {
* @return - Vector of all the HStoreFiles in use
* @throws IOException
*/
Vector<HStoreFile> flushCache(TreeMap<HStoreKey, BytesWritable> inputCache,
long logCacheFlushId) throws IOException {
Vector<HStoreFile> flushCache(TreeMap<HStoreKey, byte []> inputCache,
long logCacheFlushId)
throws IOException {
return flushCacheHelper(inputCache, logCacheFlushId, true);
}
Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, BytesWritable> inputCache,
long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
long logCacheFlushId, boolean addToAvailableMaps)
throws IOException {
synchronized(flushLock) {
if(LOG.isDebugEnabled()) {
@ -503,12 +511,11 @@ class HStore implements HConstants {
}
MapFile.Writer out = getMapFileWriter(mapfile.toString());
try {
for (Map.Entry<HStoreKey, BytesWritable> es: inputCache.entrySet()) {
for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
HStoreKey curkey = es.getKey();
if (this.familyName.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
out.append(curkey, es.getValue());
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
}
}
if(LOG.isDebugEnabled()) {
@ -539,13 +546,13 @@ class HStore implements HConstants {
this.lock.obtainWriteLock();
try {
maps.put(logCacheFlushId, getMapFileReader(mapfile.toString()));
mapFiles.put(logCacheFlushId, flushedFile);
Long flushid = Long.valueOf(logCacheFlushId);
maps.put(flushid, getMapFileReader(mapfile.toString()));
mapFiles.put(flushid, flushedFile);
if(LOG.isDebugEnabled()) {
LOG.debug("HStore available for " + this.regionName + "/"
+ this.familyName + " flush id=" + logCacheFlushId);
}
} finally {
this.lock.releaseWriteLock();
}
@ -627,7 +634,7 @@ class HStore implements HConstants {
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("max sequence id =" + maxSeenSeqID);
LOG.debug("max sequence id: " + maxSeenSeqID);
}
HStoreFile compactedOutputFile
@ -645,10 +652,8 @@ class HStore implements HConstants {
}
// Step through them, writing to the brand-new TreeMap
MapFile.Writer compactedOut =
getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
try {
// We create a new set of MapFile.Reader objects so we don't screw up
@ -665,14 +670,15 @@ class HStore implements HConstants {
MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
BytesWritable[] vals = new BytesWritable[toCompactFiles.size()];
ImmutableBytesWritable[] vals =
new ImmutableBytesWritable[toCompactFiles.size()];
boolean[] done = new boolean[toCompactFiles.size()];
int pos = 0;
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
keys[pos] = new HStoreKey();
vals[pos] = new BytesWritable();
vals[pos] = new ImmutableBytesWritable();
done[pos] = false;
pos++;
}
@ -942,7 +948,7 @@ class HStore implements HConstants {
// Fail here? No worries.
long orderVal = finalCompactedFile.loadInfo(fs);
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
// 6. Loading the new TreeMap.
@ -973,27 +979,24 @@ class HStore implements HConstants {
*
* The returned object should map column names to byte arrays (byte[]).
*/
void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
void getFull(HStoreKey key, TreeMap<Text, byte []> results)
throws IOException {
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]);
for(int i = maparray.length-1; i >= 0; i--) {
for (int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
BytesWritable readval = new BytesWritable();
map.reset();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
do {
Text readcol = readkey.getColumn();
if(results.get(readcol) == null
if (results.get(readcol) == null
&& key.matchesWithoutColumn(readkey)) {
results.put(new Text(readcol), readval);
readval = new BytesWritable();
results.put(new Text(readcol), readval.get());
readval = new ImmutableBytesWritable();
} else if(key.getRow().compareTo(readkey.getRow()) > 0) {
break;
}
@ -1013,12 +1016,12 @@ class HStore implements HConstants {
*
* If 'numVersions' is negative, the method returns all available versions.
*/
BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
byte [][] get(HStoreKey key, int numVersions) throws IOException {
if (numVersions <= 0) {
throw new IllegalArgumentException("Number of versions must be > 0");
}
Vector<BytesWritable> results = new Vector<BytesWritable>();
List<byte []> results = new ArrayList<byte []>();
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
@ -1028,7 +1031,7 @@ class HStore implements HConstants {
MapFile.Reader map = maparray[i];
synchronized(map) {
BytesWritable readval = new BytesWritable();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
map.reset();
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
if (readkey == null) {
@ -1039,14 +1042,14 @@ class HStore implements HConstants {
continue;
}
if (readkey.matchesRowCol(key)) {
results.add(readval);
readval = new BytesWritable();
results.add(readval.get());
readval = new ImmutableBytesWritable();
while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
if (numVersions > 0 && (results.size() >= numVersions)) {
break;
}
results.add(readval);
readval = new BytesWritable();
results.add(readval.get());
readval = new ImmutableBytesWritable();
}
}
}
@ -1056,8 +1059,7 @@ class HStore implements HConstants {
}
return results.size() == 0 ?
null :results.toArray(new BytesWritable[results.size()]);
null : ImmutableBytesWritable.toArray(results);
} finally {
this.lock.releaseReadLock();
}
@ -1077,17 +1079,12 @@ class HStore implements HConstants {
this.lock.obtainReadLock();
try {
long mapIndex = 0L;
Long mapIndex = Long.valueOf(0L);
// Iterate through all the MapFiles
for(Iterator<Map.Entry<Long, HStoreFile>> it = mapFiles.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<Long, HStoreFile> e = it.next();
for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
HStoreFile curHSF = e.getValue();
long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME));
long size = fs.getLength(
new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME));
if(size > maxSize) { // This is the largest one so far
maxSize = size;
mapIndex = e.getKey();
@ -1095,12 +1092,9 @@ class HStore implements HConstants {
}
MapFile.Reader r = maps.get(mapIndex);
midKey.set(((HStoreKey)r.midKey()).getRow());
} catch(IOException e) {
LOG.warn(e);
} finally {
this.lock.releaseReadLock();
}
@ -1171,14 +1165,12 @@ class HStore implements HConstants {
}
this.keys = new HStoreKey[readers.length];
this.vals = new BytesWritable[readers.length];
this.vals = new byte[readers.length][];
// Advance the readers to the first pos.
for(i = 0; i < readers.length; i++) {
keys[i] = new HStoreKey();
vals[i] = new BytesWritable();
if(firstRow.getLength() != 0) {
if(findFirstRow(i, firstRow)) {
continue;
@ -1208,16 +1200,15 @@ class HStore implements HConstants {
*/
@Override
boolean findFirstRow(int i, Text firstRow) throws IOException {
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
HStoreKey firstKey
= (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]);
if(firstKey == null) {
= (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw);
if (firstKey == null) {
// Didn't find it. Close the scanner and return TRUE
closeSubScanner(i);
return true;
}
this.vals[i] = ibw.get();
keys[i].setRow(firstKey.getRow());
keys[i].setColumn(firstKey.getColumn());
keys[i].setVersion(firstKey.getTimestamp());
@ -1232,11 +1223,12 @@ class HStore implements HConstants {
*/
@Override
boolean getNext(int i) throws IOException {
vals[i] = new BytesWritable();
if(! readers[i].next(keys[i], vals[i])) {
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
if (!readers[i].next(keys[i], ibw)) {
closeSubScanner(i);
return false;
}
vals[i] = ibw.get();
return true;
}

View File

@ -15,6 +15,7 @@
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
@ -213,41 +214,34 @@ public class HStoreFile implements HConstants, WritableComparable {
MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), conf);
try {
MapFile.Writer outA = new MapFile.Writer(conf, fs,
dstA.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class);
dstA.getMapFilePath().toString(), HStoreKey.class,
ImmutableBytesWritable.class);
try {
MapFile.Writer outB = new MapFile.Writer(conf, fs,
dstB.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class);
dstB.getMapFilePath().toString(), HStoreKey.class,
ImmutableBytesWritable.class);
try {
HStoreKey readkey = new HStoreKey();
BytesWritable readval = new BytesWritable();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
while(in.next(readkey, readval)) {
Text key = readkey.getRow();
if(key.compareTo(midKey) < 0) {
outA.append(readkey, readval);
} else {
outB.append(readkey, readval);
}
}
} finally {
outB.close();
}
} finally {
outA.close();
}
} finally {
in.close();
}
// Build an InfoFile for each output
long seqid = loadInfo(fs);
dstA.writeInfo(fs, seqid);
dstB.writeInfo(fs, seqid);
@ -262,8 +256,9 @@ public class HStoreFile implements HConstants, WritableComparable {
// Copy all the source MapFile tuples into this HSF's MapFile
MapFile.Writer out = new MapFile.Writer(conf, fs, getMapFilePath().toString(),
HStoreKey.class, BytesWritable.class);
MapFile.Writer out = new MapFile.Writer(conf, fs,
getMapFilePath().toString(),
HStoreKey.class, ImmutableBytesWritable.class);
try {
for(Iterator<HStoreFile> it = srcFiles.iterator(); it.hasNext(); ) {
@ -272,11 +267,10 @@ public class HStoreFile implements HConstants, WritableComparable {
try {
HStoreKey readkey = new HStoreKey();
BytesWritable readval = new BytesWritable();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
while(in.next(readkey, readval)) {
out.append(readkey, readval);
}
} finally {
in.close();
}
@ -287,12 +281,10 @@ public class HStoreFile implements HConstants, WritableComparable {
}
// Build a unified InfoFile from the source InfoFiles.
long unifiedSeqId = -1;
for(Iterator<HStoreFile> it = srcFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
long curSeqId = hsf.loadInfo(fs);
if(curSeqId > unifiedSeqId) {
unifiedSeqId = curSeqId;
}

View File

@ -19,25 +19,72 @@ import org.apache.hadoop.io.*;
import java.io.*;
/*******************************************************************************
/**
* A Key for a stored row
******************************************************************************/
*/
public class HStoreKey implements WritableComparable {
// TODO: Move these utility methods elsewhere (To a Column class?).
/**
* Extracts the column family name from a column
* For example, returns 'info' if the specified column was 'info:server'
* @param col name of column
* @return column family name
* @throws InvalidColumnNameException
*/
public static Text extractFamily(final Text col)
throws InvalidColumnNameException {
return extractFamily(col, false);
}
/**
* Extracts the column family name from a column
* For example, returns 'info' if the specified column was 'info:server'
*
* @param col - name of column
* @return - column family name
* @param col name of column
* @param withColon if returned family name should include the ':' suffix.
* @return column family name
* @throws InvalidColumnNameException
*/
public static Text extractFamily(Text col) {
String column = col.toString();
int colpos = column.indexOf(":");
if(colpos < 0) {
throw new IllegalArgumentException("Illegal column name has no family indicator: " + column);
public static Text extractFamily(final Text col, final boolean withColon)
throws InvalidColumnNameException {
int offset = getColonOffset(col);
// Include ':' in copy?
offset += (withColon)? 1: 0;
if (offset == col.getLength()) {
return col;
}
return new Text(column.substring(0, colpos));
byte [] buffer = new byte[offset];
System.arraycopy(col.getBytes(), 0, buffer, 0, offset);
return new Text(buffer);
}
/**
* Extracts the column qualifier, the portion that follows the colon (':')
* family/qualifier separator.
* For example, returns 'server' if the specified column was 'info:server'
* @param col name of column
* @return column qualifier or null if there is no qualifier.
* @throws InvalidColumnNameException
*/
public static Text extractQualifier(final Text col)
throws InvalidColumnNameException {
int offset = getColonOffset(col);
if (offset + 1 == col.getLength()) {
return null;
}
int bufferLength = col.getLength() - (offset + 1);
byte [] buffer = new byte[bufferLength];
System.arraycopy(col.getBytes(), offset + 1, buffer, 0, bufferLength);
return new Text(buffer);
}
private static int getColonOffset(final Text col)
throws InvalidColumnNameException {
int offset = col.find(":");
if(offset < 0) {
throw new InvalidColumnNameException(col + " is missing the colon " +
"family/qualifier separator");
}
return offset;
}
Text row;
@ -68,8 +115,8 @@ public class HStoreKey implements WritableComparable {
* Create an HStoreKey specifying the row and timestamp
* The column name defaults to the empty string
*
* @param row - row key
* @param timestamp - timestamp value
* @param row row key
* @param timestamp timestamp value
*/
public HStoreKey(Text row, long timestamp) {
this.row = new Text(row);
@ -81,8 +128,8 @@ public class HStoreKey implements WritableComparable {
* Create an HStoreKey specifying the row and column names
* The timestamp defaults to Long.MAX_VALUE
*
* @param row - row key
* @param column - column key
* @param row row key
* @param column column key
*/
public HStoreKey(Text row, Text column) {
this.row = new Text(row);
@ -93,9 +140,9 @@ public class HStoreKey implements WritableComparable {
/**
* Create an HStoreKey specifying all the fields
*
* @param row - row key
* @param column - column key
* @param timestamp - timestamp value
* @param row row key
* @param column column key
* @param timestamp timestamp value
*/
public HStoreKey(Text row, Text column, long timestamp) {
this.row = new Text(row);
@ -106,7 +153,7 @@ public class HStoreKey implements WritableComparable {
/**
* Construct a new HStoreKey from another
*
* @param other - the source key
* @param other the source key
*/
public HStoreKey(HStoreKey other) {
this();
@ -118,7 +165,7 @@ public class HStoreKey implements WritableComparable {
/**
* Change the value of the row key
*
* @param newrow - new row key value
* @param newrow new row key value
*/
public void setRow(Text newrow) {
this.row.set(newrow);
@ -127,7 +174,7 @@ public class HStoreKey implements WritableComparable {
/**
* Change the value of the column key
*
* @param newcol - new column key value
* @param newcol new column key value
*/
public void setColumn(Text newcol) {
this.column.set(newcol);
@ -136,7 +183,7 @@ public class HStoreKey implements WritableComparable {
/**
* Change the value of the timestamp field
*
* @param timestamp - new timestamp value
* @param timestamp new timestamp value
*/
public void setVersion(long timestamp) {
this.timestamp = timestamp;
@ -145,7 +192,7 @@ public class HStoreKey implements WritableComparable {
/**
* Set the value of this HStoreKey from the supplied key
*
* @param k - key value to copy
* @param k key value to copy
*/
public void set(HStoreKey k) {
this.row = k.getRow();
@ -192,16 +239,18 @@ public class HStoreKey implements WritableComparable {
}
/**
* @param other Key to compare against. Compares row and column family
* @param that Key to compare against. Compares row and column family
*
* @return true if same row and column family
* @throws InvalidColumnNameException
* @see #matchesRowCol(HStoreKey)
* @see #matchesWithoutColumn(HStoreKey)
*/
public boolean matchesRowFamily(HStoreKey other) {
return this.row.compareTo(other.row) == 0
&& extractFamily(this.column).compareTo(
extractFamily(other.getColumn())) == 0;
public boolean matchesRowFamily(HStoreKey that)
throws InvalidColumnNameException {
return this.row.compareTo(that.row) == 0 &&
extractFamily(this.column).
compareTo(extractFamily(that.getColumn())) == 0;
}
@Override
@ -234,11 +283,9 @@ public class HStoreKey implements WritableComparable {
int result = this.row.compareTo(other.row);
if(result == 0) {
result = this.column.compareTo(other.column);
if(result == 0) {
if(this.timestamp < other.timestamp) {
result = 1;
} else if(this.timestamp > other.timestamp) {
result = -1;
}

View File

@ -24,31 +24,30 @@ import java.io.*;
******************************************************************************/
public class KeyedData implements Writable {
HStoreKey key;
BytesWritable data;
byte [] data;
/** Default constructor. Used by Writable interface */
public KeyedData() {
this.key = new HStoreKey();
this.data = new BytesWritable();
}
/**
* Create a KeyedData object specifying the parts
* @param key - HStoreKey
* @param data - BytesWritable
* @param key HStoreKey
* @param data
*/
public KeyedData(HStoreKey key, BytesWritable data) {
public KeyedData(HStoreKey key, byte [] data) {
this.key = key;
this.data = data;
}
/** @return - returns the key */
/** @return returns the key */
public HStoreKey getKey() {
return key;
}
/** @return - returns the value */
public BytesWritable getData() {
public byte [] getData() {
return data;
}
@ -61,7 +60,8 @@ public class KeyedData implements Writable {
*/
public void write(DataOutput out) throws IOException {
key.write(out);
data.write(out);
out.writeShort(this.data.length);
out.write(this.data);
}
/* (non-Javadoc)
@ -69,6 +69,7 @@ public class KeyedData implements Writable {
*/
public void readFields(DataInput in) throws IOException {
key.readFields(in);
data.readFields(in);
this.data = new byte[in.readShort()];
in.readFully(this.data);
}
}
}

View File

@ -16,14 +16,14 @@
package org.apache.hadoop.hbase;
/*******************************************************************************
/**
* LeaseListener is an interface meant to be implemented by users of the Leases
* class.
*
* It receives events from the Leases class about the status of its accompanying
* lease. Users of the Leases class can use a LeaseListener subclass to, for
* example, clean up resources after a lease has expired.
******************************************************************************/
*/
public interface LeaseListener {
/** When a lease expires, this method is called. */
public void leaseExpired();

View File

@ -17,42 +17,41 @@ package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
import java.io.*;
import java.util.*;
/**
* Leases
*
* There are several server classes in HBase that need to track external clients
* that occasionally send heartbeats.
* There are several server classes in HBase that need to track external
* clients that occasionally send heartbeats.
*
* These external clients hold resources in the server class. Those resources
* need to be released if the external client fails to send a heartbeat after
* some interval of time passes.
*
* The Leases class is a general reusable class for this kind of pattern.
* <p>These external clients hold resources in the server class.
* Those resources need to be released if the external client fails to send a
* heartbeat after some interval of time passes.
*
* <p>The Leases class is a general reusable class for this kind of pattern.
* An instance of the Leases class will create a thread to do its dirty work.
* You should close() the instance if you want to clean up the thread properly.
*/
public class Leases {
static final Log LOG = LogFactory.getLog(Leases.class.getName());
protected static final Log LOG = LogFactory.getLog(Leases.class.getName());
long leasePeriod;
long leaseCheckFrequency;
LeaseMonitor leaseMonitor;
Thread leaseMonitorThread;
TreeMap<Text, Lease> leases = new TreeMap<Text, Lease>();
TreeSet<Lease> sortedLeases = new TreeSet<Lease>();
boolean running = true;
protected final long leasePeriod;
protected final long leaseCheckFrequency;
private final LeaseMonitor leaseMonitor;
private final Thread leaseMonitorThread;
protected final Map<LeaseName, Lease> leases =
new HashMap<LeaseName, Lease>();
protected final TreeSet<Lease> sortedLeases = new TreeSet<Lease>();
protected boolean running = true;
/**
* Creates a lease
*
* @param leasePeriod - length of time (milliseconds) that the lease is valid
* @param leaseCheckFrequency - how often the lease should be checked (milliseconds)
* @param leaseCheckFrequency - how often the lease should be checked
* (milliseconds)
*/
public Leases(long leasePeriod, long leaseCheckFrequency) {
this.leasePeriod = leasePeriod;
@ -88,96 +87,93 @@ public class Leases {
LOG.debug("leases closed");
}
}
String getLeaseName(final Text holderId, final Text resourceId) {
return "<holderId=" + holderId + ", resourceId=" + resourceId + ">";
}
/** A client obtains a lease... */
/* A client obtains a lease... */
/**
* Obtain a lease
*
* @param holderId - name of lease holder
* @param resourceId - resource being leased
* @param listener - listener that will process lease expirations
* @param holderId id of lease holder
* @param resourceId id of resource being leased
* @param listener listener that will process lease expirations
*/
public void createLease(Text holderId, Text resourceId,
public void createLease(final long holderId, final long resourceId,
final LeaseListener listener) {
LeaseName name = null;
synchronized(leases) {
synchronized(sortedLeases) {
Lease lease = new Lease(holderId, resourceId, listener);
Text leaseId = lease.getLeaseId();
if(leases.get(leaseId) != null) {
throw new AssertionError("Impossible state for createLease(): Lease " +
getLeaseName(holderId, resourceId) + " is still held.");
name = lease.getLeaseName();
if(leases.get(name) != null) {
throw new AssertionError("Impossible state for createLease(): " +
"Lease " + name + " is still held.");
}
leases.put(leaseId, lease);
leases.put(name, lease);
sortedLeases.add(lease);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Created lease " + getLeaseName(holderId, resourceId));
LOG.debug("Created lease " + name);
}
}
/** A client renews a lease... */
/* A client renews a lease... */
/**
* Renew a lease
*
* @param holderId - name of lease holder
* @param resourceId - resource being leased
* @param holderId id of lease holder
* @param resourceId id of resource being leased
* @throws IOException
*/
public void renewLease(Text holderId, Text resourceId) throws IOException {
public void renewLease(final long holderId, final long resourceId)
throws IOException {
LeaseName name = null;
synchronized(leases) {
synchronized(sortedLeases) {
Text leaseId = createLeaseId(holderId, resourceId);
Lease lease = leases.get(leaseId);
if(lease == null) {
name = createLeaseName(holderId, resourceId);
Lease lease = leases.get(name);
if (lease == null) {
// It's possible that someone tries to renew the lease, but
// it just expired a moment ago. So fail.
throw new IOException("Cannot renew lease that is not held: " +
getLeaseName(holderId, resourceId));
name);
}
sortedLeases.remove(lease);
lease.renew();
sortedLeases.add(lease);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Renewed lease " + getLeaseName(holderId, resourceId));
LOG.debug("Renewed lease " + name);
}
}
/**
* Client explicitly cancels a lease.
*
* @param holderId - name of lease holder
* @param resourceId - resource being leased
* @param holderId id of lease holder
* @param resourceId id of resource being leased
* @throws IOException
*/
public void cancelLease(Text holderId, Text resourceId) throws IOException {
public void cancelLease(final long holderId, final long resourceId)
throws IOException {
LeaseName name = null;
synchronized(leases) {
synchronized(sortedLeases) {
Text leaseId = createLeaseId(holderId, resourceId);
Lease lease = leases.get(leaseId);
if(lease == null) {
name = createLeaseName(holderId, resourceId);
Lease lease = leases.get(name);
if (lease == null) {
// It's possible that someone tries to renew the lease, but
// it just expired a moment ago. So fail.
throw new IOException("Cannot cancel lease that is not held: " +
getLeaseName(holderId, resourceId));
name);
}
sortedLeases.remove(lease);
leases.remove(leaseId);
leases.remove(name);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Cancel lease " + getLeaseName(holderId, resourceId));
LOG.debug("Cancel lease " + name);
}
}
@ -190,13 +186,10 @@ public class Leases {
Lease top;
while((sortedLeases.size() > 0)
&& ((top = sortedLeases.first()) != null)) {
if(top.shouldExpire()) {
leases.remove(top.getLeaseId());
leases.remove(top.getLeaseName());
sortedLeases.remove(top);
top.expired();
} else {
break;
}
@ -206,34 +199,92 @@ public class Leases {
try {
Thread.sleep(leaseCheckFrequency);
} catch (InterruptedException ie) {
// Ignore
// continue
}
}
}
}
/*
* A Lease name.
* More lightweight than String or Text.
*/
class LeaseName implements Comparable {
private final long holderId;
private final long resourceId;
LeaseName(final long hid, final long rid) {
this.holderId = hid;
this.resourceId = rid;
}
@Override
public boolean equals(Object obj) {
LeaseName other = (LeaseName)obj;
return this.holderId == other.holderId &&
this.resourceId == other.resourceId;
}
@Override
public int hashCode() {
// Copy OR'ing from javadoc for Long#hashCode.
int result = (int)(this.holderId ^ (this.holderId >>> 32));
result ^= (int)(this.resourceId ^ (this.resourceId >>> 32));
return result;
}
@Override
public String toString() {
return Long.toString(this.holderId) + "/" +
Long.toString(this.resourceId);
}
public int compareTo(Object obj) {
LeaseName other = (LeaseName)obj;
if (this.holderId < other.holderId) {
return -1;
}
if (this.holderId > other.holderId) {
return 1;
}
// holderIds are equal
if (this.resourceId < other.resourceId) {
return -1;
}
if (this.resourceId > other.resourceId) {
return 1;
}
// Objects are equal
return 0;
}
}
/** Create a lease id out of the holder and resource ids. */
Text createLeaseId(Text holderId, Text resourceId) {
return new Text("_" + holderId + "/" + resourceId + "_");
protected LeaseName createLeaseName(final long hid, final long rid) {
return new LeaseName(hid, rid);
}
/** This class tracks a single Lease. */
@SuppressWarnings("unchecked")
private class Lease implements Comparable {
Text holderId;
Text resourceId;
LeaseListener listener;
final long holderId;
final long resourceId;
final LeaseListener listener;
long lastUpdate;
private LeaseName leaseId;
Lease(Text holderId, Text resourceId, LeaseListener listener) {
Lease(final long holderId, final long resourceId,
final LeaseListener listener) {
this.holderId = holderId;
this.resourceId = resourceId;
this.listener = listener;
renew();
}
Text getLeaseId() {
return createLeaseId(holderId, resourceId);
synchronized LeaseName getLeaseName() {
if (this.leaseId == null) {
this.leaseId = createLeaseName(holderId, resourceId);
}
return this.leaseId;
}
boolean shouldExpire() {
@ -246,8 +297,7 @@ public class Leases {
void expired() {
if (LOG.isDebugEnabled()) {
LOG.debug("Lease expired " + getLeaseName(this.holderId,
this.resourceId));
LOG.debug("Lease expired " + getLeaseName());
}
listener.leaseExpired();
}
@ -259,7 +309,7 @@ public class Leases {
@Override
public int hashCode() {
int result = this.getLeaseId().hashCode();
int result = this.getLeaseName().hashCode();
result ^= Long.valueOf(this.lastUpdate).hashCode();
return result;
}
@ -272,14 +322,11 @@ public class Leases {
Lease other = (Lease) o;
if(this.lastUpdate < other.lastUpdate) {
return -1;
} else if(this.lastUpdate > other.lastUpdate) {
return 1;
} else {
return this.getLeaseId().compareTo(other.getLeaseId());
return this.getLeaseName().compareTo(other.getLeaseName());
}
}
}
}
}

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
/** Abstract base class for merge tests */
@ -31,7 +31,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
protected static final Text COLUMN_NAME = new Text("contents:");
protected Random rand;
protected HTableDescriptor desc;
protected BytesWritable value;
protected ImmutableBytesWritable value;
protected MiniDFSCluster dfsCluster;
protected FileSystem fs;
@ -52,7 +52,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
val.append(partialValue);
}
try {
value = new BytesWritable(val.toString().getBytes(HConstants.UTF8_ENCODING));
value = new ImmutableBytesWritable(val.toString().getBytes(HConstants.UTF8_ENCODING));
} catch(UnsupportedEncodingException e) {
fail();
@ -125,7 +125,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
long lockid = region.startUpdate(new Text("row_"
+ String.format("%1$05d", i)));
region.put(lockid, COLUMN_NAME, value);
region.put(lockid, COLUMN_NAME, value.get());
region.commit(lockid);
if(i % 10000 == 0) {
System.out.println("Flushing write #" + i);

View File

@ -21,41 +21,44 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/** Test case for get */
public class TestGet extends HBaseTestCase {
private static final Log LOG = LogFactory.getLog(TestGet.class.getName());
private static final Text CONTENTS = new Text("contents:");
private static final Text ROW_KEY = new Text(HGlobals.rootRegionInfo.regionName);
private static final Text ROW_KEY =
new Text(HGlobals.rootRegionInfo.regionName);
private static final String SERVER_ADDRESS = "foo.bar.com:1234";
private void verifyGet(HRegion r) throws IOException {
private void verifyGet(final HRegion r, final String expectedServer)
throws IOException {
// This should return a value because there is only one family member
BytesWritable value = r.get(ROW_KEY, CONTENTS);
byte [] value = r.get(ROW_KEY, CONTENTS);
assertNotNull(value);
// This should not return a value because there are multiple family members
value = r.get(ROW_KEY, HConstants.COLUMN_FAMILY);
assertNull(value);
// Find out what getFull returns
TreeMap<Text, byte []> values = r.getFull(ROW_KEY);
TreeMap<Text, BytesWritable> values = r.getFull(ROW_KEY);
//assertEquals(4, values.keySet().size());
// assertEquals(4, values.keySet().size());
for(Iterator<Text> i = values.keySet().iterator(); i.hasNext(); ) {
Text column = i.next();
System.out.println(column);
if(column.equals(HConstants.COL_SERVER)) {
BytesWritable val = values.get(column);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
System.out.println(" " + new String(bytes, HConstants.UTF8_ENCODING));
if (column.equals(HConstants.COL_SERVER)) {
byte [] val = values.get(column);
String server = new String(val, HConstants.UTF8_ENCODING);
assertEquals(expectedServer, server);
LOG.info(server);
}
}
}
@ -94,38 +97,35 @@ public class TestGet extends HBaseTestCase {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(bytes);
CONTENTS.write(s);
r.put(lockid, CONTENTS, new BytesWritable(bytes.toByteArray()));
r.put(lockid, CONTENTS, bytes.toByteArray());
bytes.reset();
HGlobals.rootRegionInfo.write(s);
r.put(lockid, HConstants.COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
r.put(lockid, HConstants.COL_REGIONINFO, bytes.toByteArray());
r.commit(lockid);
lockid = r.startUpdate(ROW_KEY);
r.put(lockid, HConstants.COL_SERVER,
new BytesWritable(
new HServerAddress("foo.bar.com:1234").toString().getBytes(HConstants.UTF8_ENCODING)
)
new HServerAddress(SERVER_ADDRESS).toString().
getBytes(HConstants.UTF8_ENCODING)
);
r.put(lockid, HConstants.COL_STARTCODE,
new BytesWritable(
String.valueOf(lockid).getBytes(HConstants.UTF8_ENCODING)
)
String.valueOf(lockid).getBytes(HConstants.UTF8_ENCODING)
);
r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "region"),
new BytesWritable("region".getBytes(HConstants.UTF8_ENCODING)));
"region".getBytes(HConstants.UTF8_ENCODING));
r.commit(lockid);
// Verify that get works the same from memcache as when reading from disk
// NOTE dumpRegion won't work here because it only reads from disk.
verifyGet(r);
verifyGet(r, SERVER_ADDRESS);
// Close and re-open region, forcing updates to disk
@ -135,27 +135,26 @@ public class TestGet extends HBaseTestCase {
// Read it back
verifyGet(r);
verifyGet(r, SERVER_ADDRESS);
// Update one family member and add a new one
lockid = r.startUpdate(ROW_KEY);
r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "region"),
new BytesWritable("region2".getBytes()));
"region2".getBytes());
String otherServerName = "bar.foo.com:4321";
r.put(lockid, HConstants.COL_SERVER,
new BytesWritable(
new HServerAddress("bar.foo.com:4321").toString().getBytes(HConstants.UTF8_ENCODING)
)
);
new HServerAddress(otherServerName).toString().
getBytes(HConstants.UTF8_ENCODING));
r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "junk"),
new BytesWritable("junk".getBytes()));
"junk".getBytes());
r.commit(lockid);
verifyGet(r);
verifyGet(r, otherServerName);
// Close region and re-open it
@ -165,7 +164,7 @@ public class TestGet extends HBaseTestCase {
// Read it back
verifyGet(r);
verifyGet(r, otherServerName);
// Close region once and for all

View File

@ -41,19 +41,14 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
* Since all the "tests" depend on the results of the previous test, they are
* not Junit tests that can stand alone. Consequently we have a single Junit
* test that runs the "sub-tests" as private methods.
* @throws IOException
*/
public void testHBaseCluster() {
try {
setup();
basic();
scanner();
listTables();
cleanup();
} catch(IOException e) {
e.printStackTrace();
fail();
}
public void testHBaseCluster() throws IOException {
setup();
basic();
scanner();
listTables();
cleanup();
}
public void tearDown() throws Exception {

View File

@ -20,7 +20,6 @@ import java.util.TreeMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Reader;
@ -50,10 +49,10 @@ public class TestHLog extends HBaseTestCase implements HConstants {
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
TreeMap<Text, BytesWritable> cols = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> cols = new TreeMap<Text, byte []>();
for (int i = 0; i < COL_COUNT; i++) {
cols.put(new Text(Integer.toString(i)),
new BytesWritable(new byte[] { (byte)(i + '0') }));
new byte[] { (byte)(i + '0') });
}
long timestamp = System.currentTimeMillis();
log.append(regionName, tableName, row, cols, timestamp);
@ -71,7 +70,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
assertEquals(regionName, key.getRegionName());
assertEquals(tableName, key.getTablename());
assertEquals(row, key.getRow());
assertEquals((byte)(i + '0'), val.getVal().get()[0]);
assertEquals((byte)(i + '0'), val.getVal()[0]);
System.out.println(key + " " + val);
}
while (reader.next(key, val)) {
@ -80,7 +79,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
assertEquals(tableName, key.getTablename());
assertEquals(HLog.METAROW, key.getRow());
assertEquals(HLog.METACOLUMN, val.getColumn());
assertEquals(0, val.getVal().compareTo(COMPLETE_CACHEFLUSH));
assertEquals(0, COMPLETE_CACHEFLUSH.compareTo(val.getVal()));
System.out.println(key + " " + val);
}
} finally {

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HMemcache.Snapshot;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/** memcache test case */
@ -85,10 +84,10 @@ public class TestHMemcache extends TestCase {
*/
private void addRows(final HMemcache hmc) {
for (int i = 0; i < ROW_COUNT; i++) {
TreeMap<Text, BytesWritable> columns = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> columns = new TreeMap<Text, byte []>();
for (int ii = 0; ii < COLUMNS_COUNT; ii++) {
Text k = getColumnName(i, ii);
columns.put(k, new BytesWritable(k.toString().getBytes()));
columns.put(k, k.toString().getBytes());
}
hmc.add(getRowName(i), columns, System.currentTimeMillis());
}
@ -111,7 +110,7 @@ public class TestHMemcache extends TestCase {
throws IOException {
// Save off old state.
int oldHistorySize = hmc.history.size();
TreeMap<HStoreKey, BytesWritable> oldMemcache = hmc.memcache;
TreeMap<HStoreKey, byte []> oldMemcache = hmc.memcache;
// Run snapshot.
Snapshot s = hmc.snapshotMemcacheForLog(log);
// Make some assertions about what just happened.
@ -147,7 +146,7 @@ public class TestHMemcache extends TestCase {
}
private void isExpectedRow(final int rowIndex,
TreeMap<Text, BytesWritable> row) {
TreeMap<Text, byte []> row) {
int i = 0;
for (Text colname: row.keySet()) {
String expectedColname =
@ -158,10 +157,8 @@ public class TestHMemcache extends TestCase {
// 100 bytes in size at least. This is the default size
// for BytesWriteable. For comparison, comvert bytes to
// String and trim to remove trailing null bytes.
BytesWritable value = row.get(colname);
byte[] bytes = new byte[value.getSize()];
System.arraycopy(value.get(), 0, bytes, 0, bytes.length);
String colvalueStr = new String(bytes).trim();
byte [] value = row.get(colname);
String colvalueStr = new String(value).trim();
assertEquals("Content", colnameStr, colvalueStr);
}
}
@ -171,7 +168,7 @@ public class TestHMemcache extends TestCase {
addRows(this.hmemcache);
for (int i = 0; i < ROW_COUNT; i++) {
HStoreKey hsk = new HStoreKey(getRowName(i));
TreeMap<Text, BytesWritable> all = this.hmemcache.getFull(hsk);
TreeMap<Text, byte []> all = this.hmemcache.getFull(hsk);
isExpectedRow(i, all);
}
}
@ -192,16 +189,16 @@ public class TestHMemcache extends TestCase {
HInternalScannerInterface scanner =
this.hmemcache.getScanner(timestamp, cols, new Text());
HStoreKey key = new HStoreKey();
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
for (int i = 0; scanner.next(key, results); i++) {
assertTrue("Row name",
key.toString().startsWith(getRowName(i).toString()));
assertEquals("Count of columns", COLUMNS_COUNT,
results.size());
TreeMap<Text, BytesWritable> row = new TreeMap<Text, BytesWritable>();
for(Iterator<Map.Entry<Text, BytesWritable>> it = results.entrySet().iterator();
TreeMap<Text, byte []> row = new TreeMap<Text, byte []>();
for(Iterator<Map.Entry<Text, byte []>> it = results.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<Text, BytesWritable> e = it.next();
Map.Entry<Text, byte []> e = it.next();
row.put(e.getKey(), e.getValue());
}
isExpectedRow(i, row);

View File

@ -25,7 +25,6 @@ import java.util.TreeMap;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@ -116,11 +115,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BASIC,
new BytesWritable((CONTENTSTR + k).getBytes()));
region.put(writeid, new Text(ANCHORNUM + k),
new BytesWritable((ANCHORSTR + k).getBytes()));
region.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes());
region.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes());
region.commit(writeid);
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
@ -143,20 +139,16 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
Text rowlabel = new Text("row_" + k);
BytesWritable bodydata = region.get(rowlabel, CONTENTS_BASIC);
byte [] bodydata = region.get(rowlabel, CONTENTS_BASIC);
assertNotNull(bodydata);
byte[] bytes = new byte[bodydata.getSize()];
System.arraycopy(bodydata.get(), 0, bytes, 0, bytes.length);
String bodystr = new String(bytes).toString().trim();
String bodystr = new String(bodydata).toString().trim();
String teststr = CONTENTSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
collabel = new Text(ANCHORNUM + k);
bodydata = region.get(rowlabel, collabel);
bytes = new byte[bodydata.getSize()];
System.arraycopy(bodydata.get(), 0, bytes, 0, bytes.length);
bodystr = new String(bytes).toString().trim();
bodystr = new String(bodydata).toString().trim();
teststr = ANCHORSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + collabel
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
@ -172,7 +164,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
// Try put with bad lockid.
boolean exceptionThrown = false;
try {
region.put(-1, CONTENTS_BASIC, new BytesWritable("bad input".getBytes()));
region.put(-1, CONTENTS_BASIC, "bad input".getBytes());
} catch (LockException e) {
exceptionThrown = true;
}
@ -185,7 +177,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
lockid = region.startUpdate(new Text("Some old key"));
String unregisteredColName = "FamilyGroup:FamilyLabel";
region.put(lockid, new Text(unregisteredColName),
new BytesWritable(unregisteredColName.getBytes()));
unregisteredColName.getBytes());
} catch (IOException e) {
exceptionThrown = true;
} finally {
@ -278,8 +270,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
String kLabel = String.format("%1$03d", k);
long lockid = region.startUpdate(new Text("row_vals1_" + kLabel));
region.put(lockid, cols[0], new BytesWritable(vals1[k].getBytes()));
region.put(lockid, cols[1], new BytesWritable(vals1[k].getBytes()));
region.put(lockid, cols[0], vals1[k].getBytes());
region.put(lockid, cols[1], vals1[k].getBytes());
region.commit(lockid);
numInserted += 2;
}
@ -295,16 +287,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
int numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 0;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
int curval = Integer.parseInt(new String(bytes).trim());
byte [] val = curVals.get(col);
int curval = Integer.parseInt(new String(val).trim());
for(int j = 0; j < cols.length; j++) {
if(col.compareTo(cols[j]) == 0) {
assertEquals("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp()
@ -343,16 +332,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 0;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
int curval = Integer.parseInt(new String(bytes).trim());
byte [] val = curVals.get(col);
int curval = Integer.parseInt(new String(val).trim());
for(int j = 0; j < cols.length; j++) {
if(col.compareTo(cols[j]) == 0) {
assertEquals("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp()
@ -382,8 +368,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
String kLabel = String.format("%1$03d", k);
long lockid = region.startUpdate(new Text("row_vals1_" + kLabel));
region.put(lockid, cols[0], new BytesWritable(vals1[k].getBytes()));
region.put(lockid, cols[1], new BytesWritable(vals1[k].getBytes()));
region.put(lockid, cols[0], vals1[k].getBytes());
region.put(lockid, cols[1], vals1[k].getBytes());
region.commit(lockid);
numInserted += 2;
}
@ -399,16 +385,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 0;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
int curval = Integer.parseInt(new String(bytes).trim());
byte [] val = curVals.get(col);
int curval = Integer.parseInt(new String(val).trim());
for(int j = 0; j < cols.length; j++) {
if(col.compareTo(cols[j]) == 0) {
assertEquals("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp()
@ -447,16 +430,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 0;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
int curval = Integer.parseInt(new String(bytes).trim());
byte [] val = curVals.get(col);
int curval = Integer.parseInt(new String(val).trim());
for (int j = 0; j < cols.length; j++) {
if (col.compareTo(cols[j]) == 0) {
assertEquals("Value for " + col + " should be: " + k
@ -485,16 +465,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 500;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
int curval = Integer.parseInt(new String(bytes).trim());
byte [] val = curVals.get(col);
int curval = Integer.parseInt(new String(val).trim());
for (int j = 0; j < cols.length; j++) {
if (col.compareTo(cols[j]) == 0) {
assertEquals("Value for " + col + " should be: " + k
@ -543,7 +520,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
// Write to the HRegion
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BODY, new BytesWritable(buf1.toString().getBytes()));
region.put(writeid, CONTENTS_BODY, buf1.toString().getBytes());
region.commit(writeid);
if (k > 0 && k % (N_ROWS / 100) == 0) {
System.out.println("Flushing write #" + k);
@ -660,15 +637,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
int contentsFetched = 0;
int anchorFetched = 0;
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 0;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
String curval = new String(bytes).trim();
byte [] val = curVals.get(col);
String curval = new String(val).trim();
if(col.compareTo(CONTENTS_BASIC) == 0) {
assertTrue("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp()
@ -715,15 +690,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
try {
int numFetched = 0;
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 0;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
int curval = Integer.parseInt(new String(bytes).trim());
byte [] val = curVals.get(col);
int curval = Integer.parseInt(new String(val).trim());
for (int j = 0; j < cols.length; j++) {
if (col.compareTo(cols[j]) == 0) {
@ -754,13 +727,12 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
try {
int numFetched = 0;
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
int k = 0;
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
Text col = it.next();
BytesWritable val = curVals.get(col);
byte [] val = curVals.get(col);
assertTrue(col.compareTo(CONTENTS_BODY) == 0);
assertNotNull(val);
numFetched++;
@ -792,7 +764,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
try {
int fetched = 0;
HStoreKey curKey = new HStoreKey();
TreeMap<Text, BytesWritable> curVals = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
while(s.next(curKey, curVals)) {
for(Iterator<Text> it = curVals.keySet().iterator(); it.hasNext(); ) {
it.next();

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
@ -52,8 +51,8 @@ public class TestScanner extends HBaseTestCase {
private DataInputBuffer in = new DataInputBuffer();
/** Compare the HRegionInfo we read from HBase to what we stored */
private void validateRegionInfo(BytesWritable regionBytes) throws IOException {
in.reset(regionBytes.get(), regionBytes.getSize());
private void validateRegionInfo(byte [] regionBytes) throws IOException {
in.reset(regionBytes, regionBytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(in);
@ -69,7 +68,7 @@ public class TestScanner extends HBaseTestCase {
throws IOException {
HInternalScannerInterface scanner = null;
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
HStoreKey key = new HStoreKey();
Text[][] scanColumns = {
@ -82,21 +81,15 @@ public class TestScanner extends HBaseTestCase {
scanner = region.getScanner(scanColumns[i], FIRST_ROW);
while(scanner.next(key, results)) {
assertTrue(results.containsKey(HConstants.COL_REGIONINFO));
BytesWritable val = results.get(HConstants.COL_REGIONINFO);
byte[] bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
validateRegionInfo(new BytesWritable(bytes));
byte [] val = results.get(HConstants.COL_REGIONINFO);
validateRegionInfo(val);
if(validateStartcode) {
assertTrue(results.containsKey(HConstants.COL_STARTCODE));
val = results.get(HConstants.COL_STARTCODE);
assertNotNull(val);
bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
assertFalse(bytes.length == 0);
assertFalse(val.length == 0);
long startCode =
Long.valueOf(new String(bytes, HConstants.UTF8_ENCODING));
Long.valueOf(new String(val, HConstants.UTF8_ENCODING));
assertEquals(START_CODE, startCode);
}
@ -104,10 +97,8 @@ public class TestScanner extends HBaseTestCase {
assertTrue(results.containsKey(HConstants.COL_SERVER));
val = results.get(HConstants.COL_SERVER);
assertNotNull(val);
bytes = new byte[val.getSize()];
System.arraycopy(val.get(), 0, bytes, 0, bytes.length);
assertFalse(bytes.length == 0);
String server = new String(bytes, HConstants.UTF8_ENCODING);
assertFalse(val.length == 0);
String server = new String(val, HConstants.UTF8_ENCODING);
assertEquals(0, server.compareTo(serverName));
}
results.clear();
@ -128,7 +119,7 @@ public class TestScanner extends HBaseTestCase {
/** Use get to retrieve the HRegionInfo and validate it */
private void getRegionInfo() throws IOException {
BytesWritable bytes = region.get(ROW_KEY, HConstants.COL_REGIONINFO);
byte [] bytes = region.get(ROW_KEY, HConstants.COL_REGIONINFO);
validateRegionInfo(bytes);
}
@ -163,8 +154,7 @@ public class TestScanner extends HBaseTestCase {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteStream);
HGlobals.rootRegionInfo.write(s);
region.put(lockid, HConstants.COL_REGIONINFO,
new BytesWritable(byteStream.toByteArray()));
region.put(lockid, HConstants.COL_REGIONINFO, byteStream.toByteArray());
region.commit(lockid);
// What we just committed is in the memcache. Verify that we can get
@ -191,11 +181,10 @@ public class TestScanner extends HBaseTestCase {
lockid = region.startUpdate(ROW_KEY);
region.put(lockid, HConstants.COL_SERVER,
new BytesWritable(address.toString().getBytes(HConstants.UTF8_ENCODING)));
address.toString().getBytes(HConstants.UTF8_ENCODING));
region.put(lockid, HConstants.COL_STARTCODE,
new BytesWritable(
String.valueOf(START_CODE).getBytes(HConstants.UTF8_ENCODING)));
String.valueOf(START_CODE).getBytes(HConstants.UTF8_ENCODING));
region.commit(lockid);
@ -232,7 +221,7 @@ public class TestScanner extends HBaseTestCase {
lockid = region.startUpdate(ROW_KEY);
region.put(lockid, HConstants.COL_SERVER,
new BytesWritable(address.toString().getBytes(HConstants.UTF8_ENCODING)));
address.toString().getBytes(HConstants.UTF8_ENCODING));
region.commit(lockid);

View File

@ -89,10 +89,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
}
for (int i = 0; i < values.length; i++) {
byte[] bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0,
bytes.length);
results.put(values[i].getKey().getColumn(), bytes);
results.put(values[i].getKey().getColumn(), values[i].getData());
}
HRegionInfo info = HRegion.getRegionInfo(results);