HADOOP-1538 Provide capability for client specified time stamps in HBase

HADOOP-1466 Clean up visibility and javadoc issues in HBase.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@554811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-07-10 03:06:32 +00:00
parent 2e49c9451b
commit eaf0a7b154
18 changed files with 553 additions and 244 deletions

View File

@ -53,4 +53,7 @@ Trunk (unreleased changes)
31. HADOOP-1566 Key-making utility
32. HADOOP-1415 Provide configurable per-column bloom filters.
HADOOP-1466 Clean up visibility and javadoc issues in HBase.
33. HADOOP-1538 Provide capability for client specified time stamps in HBase
HADOOP-1466 Clean up visibility and javadoc issues in HBase.

View File

@ -74,6 +74,9 @@ public class HClient implements HConstants {
this.serverAddress = serverAddress;
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return "address: " + this.serverAddress.toString() + ", regioninfo: " +
@ -312,7 +315,7 @@ public class HClient implements HConstants {
long scannerId = -1L;
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
REGIONINFO, tableName);
REGIONINFO, tableName, System.currentTimeMillis(), null);
KeyedData[] values = server.next(scannerId);
if(values == null || values.length == 0) {
break;
@ -417,7 +420,7 @@ public class HClient implements HConstants {
long scannerId = -1L;
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
REGIONINFO, tableName);
REGIONINFO, tableName, System.currentTimeMillis(), null);
boolean isenabled = false;
while(true) {
KeyedData[] values = server.next(scannerId);
@ -500,7 +503,7 @@ public class HClient implements HConstants {
long scannerId = -1L;
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
REGIONINFO, tableName);
REGIONINFO, tableName, System.currentTimeMillis(), null);
boolean disabled = false;
while(true) {
KeyedData[] values = server.next(scannerId);
@ -807,7 +810,8 @@ public class HClient implements HConstants {
long scannerId = -1L;
try {
scannerId =
server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName);
server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName,
System.currentTimeMillis(), null);
DataInputBuffer inbuf = new DataInputBuffer();
while(true) {
@ -963,7 +967,7 @@ public class HClient implements HConstants {
long scannerId = -1L;
try {
scannerId = server.openScanner(t.regionInfo.regionName,
META_COLUMNS, EMPTY_START_ROW);
META_COLUMNS, EMPTY_START_ROW, System.currentTimeMillis(), null);
DataInputBuffer inbuf = new DataInputBuffer();
while(true) {
@ -1180,9 +1184,23 @@ public class HClient implements HConstants {
* @throws IOException
*/
public synchronized HScannerInterface obtainScanner(Text[] columns,
Text startRow)
throws IOException {
return obtainScanner(columns, startRow, null);
Text startRow) throws IOException {
return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
}
/**
* Get a scanner on the current table starting at the specified row.
* Return the specified columns.
*
* @param columns array of columns to return
* @param startRow starting row in table to scan
* @param timestamp only return results whose timestamp <= this value
* @return scanner
* @throws IOException
*/
public synchronized HScannerInterface obtainScanner(Text[] columns,
Text startRow, long timestamp) throws IOException {
return obtainScanner(columns, startRow, timestamp, null);
}
/**
@ -1196,14 +1214,30 @@ public class HClient implements HConstants {
* @throws IOException
*/
public synchronized HScannerInterface obtainScanner(Text[] columns,
Text startRow, RowFilterInterface filter)
throws IOException {
Text startRow, RowFilterInterface filter) throws IOException {
return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
}
/**
* Get a scanner on the current table starting at the specified row.
* Return the specified columns.
*
* @param columns array of columns to return
* @param startRow starting row in table to scan
* @param timestamp only return results whose timestamp <= this value
* @param filter a row filter using row-key regexp and/or column data filter.
* @return scanner
* @throws IOException
*/
public synchronized HScannerInterface obtainScanner(Text[] columns,
Text startRow, long timestamp, RowFilterInterface filter)
throws IOException {
if(this.tableServers == null) {
throw new IllegalStateException("Must open table first");
}
return new ClientScanner(columns, startRow, filter);
return new ClientScanner(columns, startRow, timestamp, filter);
}
/*
* @return General HClient RetryPolicy instance.
*/
@ -1361,8 +1395,21 @@ public class HClient implements HConstants {
* @throws IOException
*/
public void commit(long lockid) throws IOException {
commit(lockid, System.currentTimeMillis());
}
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* @param timestamp - time to associate with the change
* @throws IOException
*/
public void commit(long lockid, long timestamp) throws IOException {
try {
this.currentServer.commit(this.currentRegion, this.clientid, lockid);
this.currentServer.commit(this.currentRegion, this.clientid, lockid,
timestamp);
} finally {
this.currentServer = null;
this.currentRegion = null;
@ -1399,6 +1446,7 @@ public class HClient implements HConstants {
private final Text EMPTY_COLUMN = new Text();
private Text[] columns;
private Text startRow;
private long scanTime;
private boolean closed;
private RegionLocation[] regions;
@SuppressWarnings("hiding")
@ -1422,10 +1470,11 @@ public class HClient implements HConstants {
this.regions = info.toArray(new RegionLocation[info.size()]);
}
ClientScanner(Text[] columns, Text startRow, RowFilterInterface filter)
throws IOException {
ClientScanner(Text[] columns, Text startRow, long timestamp,
RowFilterInterface filter) throws IOException {
this.columns = columns;
this.startRow = startRow;
this.scanTime = timestamp;
this.closed = false;
this.filter = filter;
if (filter != null) {
@ -1462,11 +1511,12 @@ public class HClient implements HConstants {
if (this.filter == null) {
this.scannerId = this.server.openScanner(info.regionInfo.regionName,
this.columns, currentRegion == 0 ? this.startRow
: EMPTY_START_ROW);
: EMPTY_START_ROW, scanTime, null);
} else {
this.scannerId = this.server.openScanner(info.regionInfo.regionName,
this.columns, currentRegion == 0 ? this.startRow
: EMPTY_START_ROW, filter);
this.scannerId =
this.server.openScanner(info.regionInfo.regionName,
this.columns, currentRegion == 0 ? this.startRow
: EMPTY_START_ROW, scanTime, filter);
}
break;
@ -1488,8 +1538,8 @@ public class HClient implements HConstants {
return true;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
/**
* {@inheritDoc}
*/
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
if(this.closed) {
@ -1511,8 +1561,8 @@ public class HClient implements HConstants {
return values == null ? false : values.length != 0;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HScannerInterface#close()
/**
* {@inheritDoc}
*/
public void close() throws IOException {
if(this.scannerId != -1L) {

View File

@ -26,10 +26,22 @@ import org.apache.hadoop.io.Text;
* is specified) or if multiple members of the same column family were
* specified. If so, we need to ignore the timestamp to ensure that we get all
* the family members, as they may have been last updated at different times.
* This interface exposes two APIs for querying the scanner.
*/
public interface HInternalScannerInterface {
/**
* Grab the next row's worth of values. The HScanner will return the most
* recent data value for each row that is not newer than the target time.
*
* If a dataFilter is defined, it will be used to skip rows that do not
* match its criteria. It may cause the scanner to stop prematurely if it
* knows that it will no longer accept the remaining results.
*
* @param key HStoreKey containing row and timestamp
* @param results Map of column/value pairs
* @return true if a value was found
* @throws IOException
*/
public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
throws IOException;
@ -38,9 +50,9 @@ public interface HInternalScannerInterface {
*/
public void close();
/** Returns true if the scanner is matching a column family or regex */
/** @return true if the scanner is matching a column family or regex */
public boolean isWildcardScanner();
/** Returns true if the scanner is matching multiple column family members */
/** @return true if the scanner is matching multiple column family members */
public boolean isMultipleMatchScanner();
}

View File

@ -51,8 +51,8 @@ import org.apache.hadoop.util.StringUtils;
public class HMaster implements HConstants, HMasterInterface,
HMasterRegionInterface, Runnable {
/* (non-Javadoc)
* @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
/**
* {@inheritDoc}
*/
public long getProtocolVersion(String protocol,
@SuppressWarnings("unused") long clientVersion)
@ -169,7 +169,7 @@ public class HMaster implements HConstants, HMasterInterface,
try {
regionServer = client.getHRegionConnection(region.server);
scannerId = regionServer.openScanner(region.regionName, METACOLUMNS,
FIRST_ROW);
FIRST_ROW, System.currentTimeMillis(), null);
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
@ -278,6 +278,9 @@ public class HMaster implements HConstants, HMasterInterface,
* Scanner for the <code>ROOT</code> HRegion.
*/
class RootScanner extends BaseScanner {
/**
* {@inheritDoc}
*/
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running ROOT scanner");
@ -338,11 +341,17 @@ public class HMaster implements HConstants, HMasterInterface,
Text regionName;
Text startKey;
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
return this.compareTo(o) == 0;
}
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
int result = this.regionName.hashCode();
@ -351,6 +360,10 @@ public class HMaster implements HConstants, HMasterInterface,
}
// Comparable
/**
* {@inheritDoc}
*/
public int compareTo(Object o) {
MetaRegion other = (MetaRegion)o;
@ -380,6 +393,9 @@ public class HMaster implements HConstants, HMasterInterface,
* action would prevent other work from getting done.
*/
class MetaScanner extends BaseScanner {
/**
* {@inheritDoc}
*/
@SuppressWarnings("null")
public void run() {
while (!closed) {
@ -804,8 +820,8 @@ public class HMaster implements HConstants, HMasterInterface,
// HMasterRegionInterface
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerStartup(org.apache.hadoop.hbase.HServerInfo)
/**
* {@inheritDoc}
*/
@SuppressWarnings("unused")
public void regionServerStartup(HServerInfo serverInfo)
@ -838,8 +854,8 @@ public class HMaster implements HConstants, HMasterInterface,
return s.hashCode();
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerReport(org.apache.hadoop.hbase.HServerInfo, org.apache.hadoop.hbase.HMsg[])
/**
* {@inheritDoc}
*/
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
throws IOException {
@ -1336,7 +1352,7 @@ public class HMaster implements HConstants, HMasterInterface,
}
server.delete(regionName, clientId, lockid, COL_SERVER);
server.delete(regionName, clientId, lockid, COL_STARTCODE);
server.commit(regionName, clientId, lockid);
server.commit(regionName, clientId, lockid, System.currentTimeMillis());
}
// Get regions reassigned
@ -1384,7 +1400,8 @@ public class HMaster implements HConstants, HMasterInterface,
try {
LOG.debug("scanning root region");
scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow);
scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName,
columns, startRow, System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
break;
@ -1413,7 +1430,8 @@ public class HMaster implements HConstants, HMasterInterface,
server = client.getHRegionConnection(r.server);
scannerId = server.openScanner(r.regionName, columns, startRow);
scannerId = server.openScanner(r.regionName, columns, startRow,
System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId, r.regionName);
}
@ -1513,7 +1531,9 @@ public class HMaster implements HConstants, HMasterInterface,
}
server.delete(metaRegionName, clientId, lockid, COL_SERVER);
server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
server.commit(metaRegionName, clientId, lockid);
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
break;
} catch(NotServingRegionException e) {
@ -1622,7 +1642,9 @@ public class HMaster implements HConstants, HMasterInterface,
long lockid = server.startUpdate(metaRegionName, clientId, regionName);
server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress);
server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
server.commit(metaRegionName, clientId, lockid);
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
break;
} catch(NotServingRegionException e) {
@ -1639,15 +1661,15 @@ public class HMaster implements HConstants, HMasterInterface,
// HMasterInterface
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#isMasterRunning()
/**
* {@inheritDoc}
*/
public boolean isMasterRunning() {
return !closed;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#shutdown()
/**
* {@inheritDoc}
*/
public void shutdown() {
TimerTask tt = new TimerTask() {
@ -1664,8 +1686,8 @@ public class HMaster implements HConstants, HMasterInterface,
t.schedule(tt, 10);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#createTable(org.apache.hadoop.hbase.HTableDescriptor)
/**
* {@inheritDoc}
*/
public void createTable(HTableDescriptor desc) throws IOException {
if (!isMasterRunning()) {
@ -1717,7 +1739,8 @@ public class HMaster implements HConstants, HMasterInterface,
long lockid = server.startUpdate(metaRegionName, clientId, regionName);
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
byteValue.toByteArray());
server.commit(metaRegionName, clientId, lockid);
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
// 4. Close the new region to flush it to disk
@ -1741,8 +1764,8 @@ public class HMaster implements HConstants, HMasterInterface,
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#deleteTable(org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void deleteTable(Text tableName) throws IOException {
new TableDelete(tableName).process();
@ -1751,36 +1774,36 @@ public class HMaster implements HConstants, HMasterInterface,
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#addColumn(org.apache.hadoop.io.Text, org.apache.hadoop.hbase.HColumnDescriptor)
/**
* {@inheritDoc}
*/
public void addColumn(Text tableName, HColumnDescriptor column) throws IOException {
new AddColumn(tableName, column).process();
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#deleteColumn(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void deleteColumn(Text tableName, Text columnName) throws IOException {
new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process();
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#enableTable(org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void enableTable(Text tableName) throws IOException {
new ChangeTableState(tableName, true).process();
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#findRootRegion()
/**
* {@inheritDoc}
*/
public HServerAddress findRootRegion() {
return rootRegionLocation;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HMasterInterface#disableTable(org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void disableTable(Text tableName) throws IOException {
new ChangeTableState(tableName, false).process();
@ -1837,7 +1860,8 @@ public class HMaster implements HConstants, HMasterInterface,
// Open a scanner on the meta region
long scannerId =
server.openScanner(m.regionName, METACOLUMNS, tableName);
server.openScanner(m.regionName, METACOLUMNS, tableName,
System.currentTimeMillis(), null);
try {
DataInputBuffer inbuf = new DataInputBuffer();
@ -1995,7 +2019,9 @@ public class HMaster implements HConstants, HMasterInterface,
updateRegionInfo(server, m.regionName, i);
server.delete(m.regionName, clientId, lockid, COL_SERVER);
server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
server.commit(m.regionName, clientId, lockid);
server.commit(m.regionName, clientId, lockid,
System.currentTimeMillis());
lockid = -1L;
if(LOG.isDebugEnabled()) {
@ -2151,7 +2177,7 @@ public class HMaster implements HConstants, HMasterInterface,
lockid = server.startUpdate(regionName, clientId, i.regionName);
server.put(regionName, clientId, lockid, COL_REGIONINFO,
byteValue.toByteArray());
server.commit(regionName, clientId, lockid);
server.commit(regionName, clientId, lockid, System.currentTimeMillis());
lockid = -1L;
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
@ -2252,8 +2278,8 @@ public class HMaster implements HConstants, HMasterInterface,
this.server = server;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.LeaseListener#leaseExpired()
/**
* {@inheritDoc}
*/
public void leaseExpired() {
LOG.info(server + " lease expired");

View File

@ -46,15 +46,17 @@ public class HMemcache {
final HLocking lock = new HLocking();
/** constructor */
public HMemcache() {
super();
}
/** represents the state of the memcache at a specified point in time */
public static class Snapshot {
public TreeMap<HStoreKey, byte []> memcacheSnapshot = null;
public long sequenceId = 0;
TreeMap<HStoreKey, byte []> memcacheSnapshot = null;
long sequenceId = 0;
public Snapshot() {
Snapshot() {
super();
}
}
@ -72,7 +74,7 @@ public class HMemcache {
*
* @return frozen HMemcache TreeMap and HLog sequence number.
*/
public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot retval = new Snapshot();
this.lock.obtainWriteLock();
@ -112,6 +114,7 @@ public class HMemcache {
* Delete the snapshot, remove from history.
*
* Modifying the structure means we need to obtain a writelock.
* @throws IOException
*/
public void deleteSnapshot() throws IOException {
this.lock.obtainWriteLock();
@ -251,6 +254,9 @@ public class HMemcache {
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
if(HConstants.DELETE_BYTES.compareTo(es.getValue()) == 0) {
break;
}
result.add(tailMap.get(itKey));
curKey.setVersion(itKey.getTimestamp() - 1);
}
@ -264,7 +270,7 @@ public class HMemcache {
/**
* Return a scanner over the keys in the HMemcache
*/
public HInternalScannerInterface getScanner(long timestamp,
HInternalScannerInterface getScanner(long timestamp,
Text targetCols[], Text firstRow)
throws IOException {
return new HMemcacheScanner(timestamp, targetCols, firstRow);
@ -280,7 +286,7 @@ public class HMemcache {
final Iterator<HStoreKey> keyIterators[];
@SuppressWarnings("unchecked")
public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
throws IOException {
super(timestamp, targetCols);
@ -331,6 +337,7 @@ public class HMemcache {
* @param firstRow seek to this row
* @return true if this is the first row
*/
@Override
boolean findFirstRow(int i, Text firstRow) {
return firstRow.getLength() == 0 ||
keys[i].getRow().compareTo(firstRow) >= 0;
@ -342,6 +349,7 @@ public class HMemcache {
* @param i Which iterator to fetch next value from
* @return true if there is more data available
*/
@Override
boolean getNext(int i) {
if (!keyIterators[i].hasNext()) {
closeSubScanner(i);
@ -353,6 +361,7 @@ public class HMemcache {
}
/** Shut down an individual map iterator. */
@Override
void closeSubScanner(int i) {
keyIterators[i] = null;
keys[i] = null;
@ -361,6 +370,7 @@ public class HMemcache {
}
/** Shut down map iterators, and release the lock */
@Override
public void close() {
if(! scannerClosed) {
try {

View File

@ -316,7 +316,6 @@ class HMerge implements HConstants {
/** Instantiated to compact the meta region */
private static class OfflineMerger extends Merger {
private Path dir;
private TreeSet<HRegionInfo> metaRegions;
private TreeMap<Text, byte []> results;
@ -324,7 +323,6 @@ class HMerge implements HConstants {
throws IOException {
super(conf, fs, tableName);
this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
this.metaRegions = new TreeSet<HRegionInfo>();
this.results = new TreeMap<Text, byte []>();
@ -334,7 +332,7 @@ class HMerge implements HConstants {
new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo, null);
HInternalScannerInterface rootScanner =
root.getScanner(META_COLS, new Text());
root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
try {
while(rootScanner.next(key, results)) {
@ -357,7 +355,7 @@ class HMerge implements HConstants {
}
@Override
protected TreeSet<HRegionInfo> next() throws IOException {
protected TreeSet<HRegionInfo> next() {
more = false;
return metaRegions;
}
@ -380,7 +378,7 @@ class HMerge implements HConstants {
root.delete(lockid, COL_REGIONINFO);
root.delete(lockid, COL_SERVER);
root.delete(lockid, COL_STARTCODE);
root.commit(lockid);
root.commit(lockid, System.currentTimeMillis());
lockid = -1L;
if(LOG.isDebugEnabled()) {
@ -405,7 +403,7 @@ class HMerge implements HConstants {
try {
lockid = root.startUpdate(newRegion.getRegionName());
root.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
root.commit(lockid);
root.commit(lockid, System.currentTimeMillis());
lockid = -1L;
if(LOG.isDebugEnabled()) {

View File

@ -95,7 +95,11 @@ public class HRegion implements HConstants {
* HRegionServer. Returns a brand-new active HRegion, also
* running on the current HRegionServer.
*/
static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException {
static HRegion closeAndMerge(final HRegion srcA, final HRegion srcB)
throws IOException {
HRegion a = srcA;
HRegion b = srcB;
// Make sure that srcA comes first; important for key-ordering during
// write of the merged file.
@ -109,25 +113,24 @@ public class HRegion implements HConstants {
} else if((srcB.getStartKey() == null) // A is not null but B is
|| (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
HRegion tmp = srcA;
srcA = srcB;
srcB = tmp;
a = srcB;
b = srcA;
}
if (! srcA.getEndKey().equals(srcB.getStartKey())) {
if (! a.getEndKey().equals(b.getStartKey())) {
throw new IOException("Cannot merge non-adjacent regions");
}
FileSystem fs = srcA.getFilesystem();
Configuration conf = srcA.getConf();
HTableDescriptor tabledesc = srcA.getTableDesc();
HLog log = srcA.getLog();
Path rootDir = srcA.getRootDir();
FileSystem fs = a.getFilesystem();
Configuration conf = a.getConf();
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
Path rootDir = a.getRootDir();
Text startKey = srcA.getStartKey();
Text endKey = srcB.getEndKey();
Text startKey = a.getStartKey();
Text endKey = b.getEndKey();
Path merges = new Path(srcA.getRegionDir(), MERGEDIR);
Path merges = new Path(a.getRegionDir(), MERGEDIR);
if(! fs.exists(merges)) {
fs.mkdirs(merges);
}
@ -141,8 +144,8 @@ public class HRegion implements HConstants {
throw new IOException("Cannot merge; target file collision at " + newRegionDir);
}
LOG.info("starting merge of regions: " + srcA.getRegionName() + " and "
+ srcB.getRegionName() + " new region start key is '"
LOG.info("starting merge of regions: " + a.getRegionName() + " and "
+ b.getRegionName() + " new region start key is '"
+ (startKey == null ? "" : startKey) + "', end key is '"
+ (endKey == null ? "" : endKey) + "'");
@ -151,7 +154,7 @@ public class HRegion implements HConstants {
TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
TreeMap<Text, Vector<HStoreFile>> filesToMerge =
new TreeMap<Text, Vector<HStoreFile>>();
for(HStoreFile src: srcA.flushcache(true)) {
for(HStoreFile src: a.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
v = new Vector<HStoreFile>();
@ -160,7 +163,7 @@ public class HRegion implements HConstants {
v.add(src);
}
for(HStoreFile src: srcB.flushcache(true)) {
for(HStoreFile src: b.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
v = new Vector<HStoreFile>();
@ -188,12 +191,12 @@ public class HRegion implements HConstants {
if(LOG.isDebugEnabled()) {
LOG.debug("flushing changes since start of merge for region "
+ srcA.getRegionName());
+ a.getRegionName());
}
filesToMerge.clear();
for(HStoreFile src: srcA.close()) {
for(HStoreFile src: a.close()) {
if(! alreadyMerged.contains(src)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
@ -206,10 +209,10 @@ public class HRegion implements HConstants {
if(LOG.isDebugEnabled()) {
LOG.debug("flushing changes since start of merge for region "
+ srcB.getRegionName());
+ b.getRegionName());
}
for(HStoreFile src: srcB.close()) {
for(HStoreFile src: b.close()) {
if(! alreadyMerged.contains(src)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
@ -391,11 +394,13 @@ public class HRegion implements HConstants {
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
*
* The returned Vector is a list of all the storage files that the HRegion's
* component HStores make use of. It's a list of HStoreFile objects.
*
* This method could take some time to execute, so don't call it from a
* time-sensitive thread.
*
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of HStoreFile objects.
*
* @throws IOException
*/
public Vector<HStoreFile> close() throws IOException {
lock.obtainWriteLock();
@ -559,42 +564,52 @@ public class HRegion implements HConstants {
// HRegion accessors
//////////////////////////////////////////////////////////////////////////////
/** @return start key for region */
public Text getStartKey() {
return regionInfo.startKey;
}
/** @return end key for region */
public Text getEndKey() {
return regionInfo.endKey;
}
/** @return region id */
public long getRegionId() {
return regionInfo.regionId;
}
/** @return region name */
public Text getRegionName() {
return regionInfo.regionName;
}
/** @return root directory path */
public Path getRootDir() {
return rootDir;
}
/** @return HTableDescriptor for this region */
public HTableDescriptor getTableDesc() {
return regionInfo.tableDesc;
}
/** @return HLog in use for this region */
public HLog getLog() {
return log;
}
/** @return Configuration object */
public Configuration getConf() {
return conf;
}
/** @return region directory Path */
public Path getRegionDir() {
return regiondir;
}
/** @return FileSystem being used by this region */
public FileSystem getFilesystem() {
return fs;
}
@ -980,21 +995,19 @@ public class HRegion implements HConstants {
}
}
/**
* Return an iterator that scans over the HRegion, returning the indicated
* columns. This Iterator must be closed by the caller.
*/
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow)
throws IOException {
return getScanner(cols, firstRow, null);
}
/**
* Return an iterator that scans over the HRegion, returning the indicated
* columns for only the rows that match the data filter. This Iterator must be closed by the caller.
*
* @param cols columns desired in result set
* @param firstRow row which is the starting point of the scan
* @param timestamp only return rows whose timestamp is <= this value
* @param filter row filter
* @return HScannerInterface
* @throws IOException
*/
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow, RowFilterInterface filter)
throws IOException {
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow,
long timestamp, RowFilterInterface filter) throws IOException {
lock.obtainReadLock();
try {
TreeSet<Text> families = new TreeSet<Text>();
@ -1007,7 +1020,7 @@ public class HRegion implements HConstants {
for (Text family: families) {
storelist[i++] = stores.get(family);
}
return new HScanner(cols, firstRow, memcache, storelist, filter);
return new HScanner(cols, firstRow, timestamp, memcache, storelist, filter);
} finally {
lock.releaseReadLock();
}
@ -1029,6 +1042,7 @@ public class HRegion implements HConstants {
*
* @param row Row to update
* @return lockid
* @throws IOException
* @see #put(long, Text, byte[])
*/
public long startUpdate(Text row) throws IOException {
@ -1048,6 +1062,11 @@ public class HRegion implements HConstants {
*
* This method really just tests the input, then calls an internal localput()
* method.
*
* @param lockid lock id obtained from startUpdate
* @param targetCol name of column to be updated
* @param val new value for column
* @throws IOException
*/
public void put(long lockid, Text targetCol, byte [] val) throws IOException {
if (DELETE_BYTES.compareTo(val) == 0) {
@ -1058,6 +1077,10 @@ public class HRegion implements HConstants {
/**
* Delete a value or write a value. This is a just a convenience method for put().
*
* @param lockid lock id obtained from startUpdate
* @param targetCol name of column to be deleted
* @throws IOException
*/
public void delete(long lockid, Text targetCol) throws IOException {
localput(lockid, targetCol, DELETE_BYTES.get());
@ -1109,6 +1132,9 @@ public class HRegion implements HConstants {
* Abort a pending set of writes. This dumps from memory all in-progress
* writes associated with the given row-lock. These values have not yet
* been placed in memcache or written to the log.
*
* @param lockid lock id obtained from startUpdate
* @throws IOException
*/
public void abort(long lockid) throws IOException {
Text row = getRowFromLock(lockid);
@ -1142,9 +1168,10 @@ public class HRegion implements HConstants {
* Once updates hit the change log, they are safe. They will either be moved
* into an HStore in the future, or they will be recovered from the log.
* @param lockid Lock for row we're to commit.
* @param timestamp the time to associate with this change
* @throws IOException
*/
public void commit(final long lockid) throws IOException {
public void commit(final long lockid, long timestamp) throws IOException {
// Remove the row from the pendingWrites list so
// that repeated executions won't screw this up.
Text row = getRowFromLock(lockid);
@ -1156,12 +1183,11 @@ public class HRegion implements HConstants {
// hasn't aborted/committed the write-operation
synchronized(row) {
// Add updates to the log and add values to the memcache.
long commitTimestamp = System.currentTimeMillis();
TreeMap<Text, byte []> columns = this.targetColumns.get(lockid);
if (columns != null && columns.size() > 0) {
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
row, columns, commitTimestamp);
memcache.add(row, columns, commitTimestamp);
row, columns, timestamp);
memcache.add(row, columns, timestamp);
// OK, all done!
}
targetColumns.remove(lockid);
@ -1287,9 +1313,8 @@ public class HRegion implements HConstants {
/** Create an HScanner with a handle on many HStores. */
@SuppressWarnings("unchecked")
HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores, RowFilterInterface filter)
throws IOException {
long scanTime = System.currentTimeMillis();
HScanner(Text[] cols, Text firstRow, long timestamp, HMemcache memcache,
HStore[] stores, RowFilterInterface filter) throws IOException {
this.dataFilter = filter;
if (null != dataFilter) {
dataFilter.reset();
@ -1310,7 +1335,7 @@ public class HRegion implements HConstants {
// NOTE: the memcache scanner should be the first scanner
try {
HInternalScannerInterface scanner =
memcache.getScanner(scanTime, cols, firstRow);
memcache.getScanner(timestamp, cols, firstRow);
if(scanner.isWildcardScanner()) {
this.wildcardMatch = true;
}
@ -1320,7 +1345,7 @@ public class HRegion implements HConstants {
scanners[0] = scanner;
for(int i = 0; i < stores.length; i++) {
scanner = stores[i].getScanner(scanTime, cols, firstRow);
scanner = stores[i].getScanner(timestamp, cols, firstRow);
if(scanner.isWildcardScanner()) {
this.wildcardMatch = true;
}
@ -1347,32 +1372,22 @@ public class HRegion implements HConstants {
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HInternalScannerInterface#isWildcardScanner()
/**
* {@inheritDoc}
*/
public boolean isWildcardScanner() {
return wildcardMatch;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HInternalScannerInterface#isMultipleMatchScanner()
/**
* {@inheritDoc}
*/
public boolean isMultipleMatchScanner() {
return multipleMatchers;
}
/*
* (non-Javadoc)
*
* Grab the next row's worth of values. The HScanner will return the most
* recent data value for each row that is not newer than the target time.
*
* If a dataFilter is defined, it will be used to skip rows that do not
* match its criteria. It may cause the scanner to stop prematurely if it
* knows that it will no longer accept the remaining results.
*
* @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey,
* java.util.TreeMap)
/**
* {@inheritDoc}
*/
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results)
throws IOException {
@ -1501,8 +1516,8 @@ public class HRegion implements HConstants {
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HInternalScannerInterface#close()
/**
* {@inheritDoc}
*/
public void close() {
for(int i = 0; i < scanners.length; i++) {
@ -1573,7 +1588,7 @@ public class HRegion implements HConstants {
DataOutputStream s = new DataOutputStream(bytes);
r.getRegionInfo().write(s);
meta.put(writeid, COL_REGIONINFO, bytes.toByteArray());
meta.commit(writeid);
meta.commit(writeid, System.currentTimeMillis());
}
static void addRegionToMETA(final HClient client,

View File

@ -171,10 +171,11 @@ public interface HRegionInterface extends VersionedProtocol {
* @param regionName region name
* @param clientid a unique value to identify the client
* @param lockid lock id returned from startUpdate
* @param timestamp the time (in milliseconds to associate with this change)
* @throws IOException
*/
public void commit(final Text regionName, final long clientid,
final long lockid)
final long lockid, final long timestamp)
throws IOException;
/**
@ -190,31 +191,20 @@ public interface HRegionInterface extends VersionedProtocol {
// remote scanner interface
//////////////////////////////////////////////////////////////////////////////
/**
* Opens a remote scanner.
*
* @param regionName name of region to scan
* @param columns columns to scan
* @param startRow starting row to scan
*
* @return scannerId scanner identifier used in other calls
* @throws IOException
*/
public long openScanner(Text regionName, Text[] columns, Text startRow)
throws IOException;
/**
* Opens a remote scanner with a RowFilter.
*
* @param regionName name of region to scan
* @param columns columns to scan
* @param startRow starting row to scan
* @param timestamp only return values whose timestamp is <= this value
* @param filter RowFilter for filtering results at the row-level.
*
* @return scannerId scanner identifier used in other calls
* @throws IOException
*/
public long openScanner(Text regionName, Text[] columns, Text startRow, RowFilterInterface filter)
public long openScanner(Text regionName, Text[] columns, Text startRow,
long timestamp, RowFilterInterface filter)
throws IOException;
/**

View File

@ -48,8 +48,8 @@ import org.apache.hadoop.util.StringUtils;
******************************************************************************/
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/* (non-Javadoc)
* @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
/**
* {@inheritDoc}
*/
public long getProtocolVersion(final String protocol,
@SuppressWarnings("unused") final long clientVersion)
@ -107,8 +107,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
HClient client = new HClient(conf);
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#closing(org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void closing(final Text regionName) {
lock.writeLock().lock();
@ -124,8 +124,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#closed(org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void closed(final Text regionName) {
lock.writeLock().lock();
@ -139,8 +139,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
/**
* {@inheritDoc}
*/
public void run() {
while(! stopRequested) {
@ -261,8 +261,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
protected final Integer cacheFlusherLock = new Integer(0);
/** Runs periodically to flush the memcache */
class Flusher implements Runnable {
/* (non-Javadoc)
* @see java.lang.Runnable#run()
/**
* {@inheritDoc}
*/
public void run() {
while(! stopRequested) {
@ -327,8 +327,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
private int maxLogEntries =
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
/* (non-Javadoc)
* @see java.lang.Runnable#run()
/**
* {@inheritDoc}
*/
public void run() {
while(! stopRequested) {
@ -769,8 +769,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
/**
* {@inheritDoc}
*/
public void run() {
for(ToDoEntry e = null; !stopRequested; ) {
@ -895,16 +895,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// HRegionInterface
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#getRegionInfo(org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public HRegionInfo getRegionInfo(final Text regionName)
throws NotServingRegionException {
return getRegion(regionName).getRegionInfo();
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public byte [] get(final Text regionName, final Text row,
final Text column)
@ -912,8 +912,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return getRegion(regionName).get(row, column);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, int)
/**
* {@inheritDoc}
*/
public byte [][] get(final Text regionName, final Text row,
final Text column, final int numVersions)
@ -921,16 +921,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return getRegion(regionName).get(row, column, numVersions);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, long, int)
/**
* {@inheritDoc}
*/
public byte [][] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException {
return getRegion(regionName).get(row, column, timestamp, numVersions);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#getRow(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public KeyedData[] getRow(final Text regionName, final Text row) throws IOException {
HRegion region = getRegion(regionName);
@ -944,8 +944,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return result;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#next(long)
/**
* {@inheritDoc}
*/
public KeyedData[] next(final long scannerId)
throws IOException {
@ -993,8 +993,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return values.toArray(new KeyedData[values.size()]);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#startUpdate(org.apache.hadoop.io.Text, long, org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public long startUpdate(Text regionName, long clientid, Text row)
throws IOException {
@ -1015,6 +1015,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.localLockId = lockId;
}
/**
* {@inheritDoc}
*/
public void leaseExpired() {
try {
localRegion.abort(localLockId);
@ -1024,8 +1027,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#put(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text, org.apache.hadoop.io.BytesWritable)
/**
* {@inheritDoc}
*/
public void put(final Text regionName, final long clientid,
final long lockid, final Text column, final byte [] val)
@ -1035,8 +1038,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.put(lockid, column, val);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#delete(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void delete(Text regionName, long clientid, long lockid, Text column)
throws IOException {
@ -1045,8 +1048,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.delete(lockid, column);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#abort(org.apache.hadoop.io.Text, long, long)
/**
* {@inheritDoc}
*/
public void abort(Text regionName, long clientid, long lockid)
throws IOException {
@ -1055,18 +1058,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.abort(lockid);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#commit(org.apache.hadoop.io.Text, long, long)
/**
* {@inheritDoc}
*/
public void commit(Text regionName, long clientid, long lockid)
throws IOException {
public void commit(Text regionName, final long clientid, final long lockid,
final long timestamp) throws IOException {
HRegion region = getRegion(regionName, true);
leases.cancelLease(clientid, lockid);
region.commit(lockid);
region.commit(lockid, timestamp);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#renewLease(long, long)
/**
* {@inheritDoc}
*/
public void renewLease(long lockid, long clientid) throws IOException {
leases.renewLease(clientid, lockid);
@ -1136,8 +1139,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.scannerName = n;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.LeaseListener#leaseExpired()
/**
* {@inheritDoc}
*/
public void leaseExpired() {
LOG.info("Scanner " + this.scannerName + " lease expired");
@ -1151,25 +1154,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/**
* {@inheritDoc}
*/
public long openScanner(final Text regionName, final Text[] cols,
final Text firstRow)
throws IOException{
return openScanner(regionName, cols, firstRow, null);
}
/**
* {@inheritDoc}
*/
public long openScanner(Text regionName, Text[] cols, Text firstRow,
final RowFilterInterface filter)
final long timestamp, final RowFilterInterface filter)
throws IOException {
HRegion r = getRegion(regionName);
long scannerId = -1L;
try {
HInternalScannerInterface s = r.getScanner(cols, firstRow, filter);
HInternalScannerInterface s =
r.getScanner(cols, firstRow, timestamp, filter);
scannerId = rand.nextLong();
String scannerName = String.valueOf(scannerId);
synchronized(scanners) {
@ -1184,8 +1179,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return scannerId;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#close(long)
/**
* {@inheritDoc}
*/
public void close(final long scannerId) throws IOException {
String scannerName = String.valueOf(scannerId);

View File

@ -165,7 +165,9 @@ class HRegiondirReader {
HRegion r = new HRegion(this.parentdir, null,
FileSystem.get(this.conf), conf, info, null);
Text [] families = info.tableDesc.families().keySet().toArray(new Text [] {});
HInternalScannerInterface scanner = r.getScanner(families, new Text());
HInternalScannerInterface scanner =
r.getScanner(families, new Text(), System.currentTimeMillis(), null);
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
// Print out table header line.

View File

@ -391,6 +391,7 @@ class HStore implements HConstants {
super(fs, dirName, conf);
}
/** {@inheritDoc} */
@Override
public Writable get(WritableComparable key, Writable val) throws IOException {
// Note - the key being passed to us is always a HStoreKey
@ -407,6 +408,7 @@ class HStore implements HConstants {
return null;
}
/** {@inheritDoc} */
@Override
public WritableComparable getClosest(WritableComparable key, Writable val)
throws IOException {
@ -438,6 +440,7 @@ class HStore implements HConstants {
super(conf, fs, dirName, keyClass, valClass, compression);
}
/** {@inheritDoc} */
@Override
public void append(WritableComparable key, Writable val) throws IOException {
// Note - the key being passed to us is always a HStoreKey
@ -1031,6 +1034,9 @@ class HStore implements HConstants {
Text readcol = readkey.getColumn();
if (results.get(readcol) == null
&& key.matchesWithoutColumn(readkey)) {
if(readval.equals(HConstants.DELETE_BYTES)) {
break;
}
results.put(new Text(readcol), readval.get());
readval = new ImmutableBytesWritable();
} else if(key.getRow().compareTo(readkey.getRow()) > 0) {
@ -1078,10 +1084,14 @@ class HStore implements HConstants {
continue;
}
if (readkey.matchesRowCol(key)) {
if(readval.equals(HConstants.DELETE_BYTES)) {
break;
}
results.add(readval.get());
readval = new ImmutableBytesWritable();
while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
if (numVersions > 0 && (results.size() >= numVersions)) {
if ((numVersions > 0 && (results.size() >= numVersions))
|| readval.equals(HConstants.DELETE_BYTES)) {
break;
}
results.add(readval.get());

View File

@ -36,6 +36,9 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
protected FileSystem fs;
protected Path dir;
/**
* {@inheritDoc}
*/
@Override
public void setUp() throws Exception {
super.setUp();
@ -111,6 +114,9 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
}
}
/**
* {@inheritDoc}
*/
@Override
public void tearDown() throws Exception {
super.tearDown();
@ -128,7 +134,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
+ String.format("%1$05d", i)));
region.put(lockid, COLUMN_NAME, value.get());
region.commit(lockid);
region.commit(lockid, System.currentTimeMillis());
if(i % 10000 == 0) {
System.out.println("Flushing write #" + i);
region.flushcache(false);

View File

@ -104,7 +104,7 @@ public class TestGet extends HBaseTestCase {
r.put(lockid, HConstants.COL_REGIONINFO, bytes.toByteArray());
r.commit(lockid);
r.commit(lockid, System.currentTimeMillis());
lockid = r.startUpdate(ROW_KEY);
@ -120,7 +120,7 @@ public class TestGet extends HBaseTestCase {
r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "region"),
"region".getBytes(HConstants.UTF8_ENCODING));
r.commit(lockid);
r.commit(lockid, System.currentTimeMillis());
// Verify that get works the same from memcache as when reading from disk
// NOTE dumpRegion won't work here because it only reads from disk.
@ -152,7 +152,7 @@ public class TestGet extends HBaseTestCase {
r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "junk"),
"junk".getBytes());
r.commit(lockid);
r.commit(lockid, System.currentTimeMillis());
verifyGet(r, otherServerName);

View File

@ -120,7 +120,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes());
region.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes());
region.commit(writeid);
region.commit(writeid, System.currentTimeMillis());
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -275,7 +275,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
long lockid = region.startUpdate(new Text("row_vals1_" + kLabel));
region.put(lockid, cols[0], vals1[k].getBytes());
region.put(lockid, cols[1], vals1[k].getBytes());
region.commit(lockid);
region.commit(lockid, System.currentTimeMillis());
numInserted += 2;
}
@ -286,7 +286,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
HInternalScannerInterface s = region.getScanner(cols, new Text());
HInternalScannerInterface s =
region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
int numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -331,7 +332,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text());
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -373,7 +374,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
long lockid = region.startUpdate(new Text("row_vals1_" + kLabel));
region.put(lockid, cols[0], vals1[k].getBytes());
region.put(lockid, cols[1], vals1[k].getBytes());
region.commit(lockid);
region.commit(lockid, System.currentTimeMillis());
numInserted += 2;
}
@ -384,7 +385,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text());
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -429,7 +430,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text());
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -464,7 +465,9 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text("row_vals1_500"));
s = region.getScanner(cols, new Text("row_vals1_500"),
System.currentTimeMillis(), null);
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -524,7 +527,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
// Write to the HRegion
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BODY, buf1.toString().getBytes());
region.commit(writeid);
region.commit(writeid, System.currentTimeMillis());
if (k > 0 && k % (N_ROWS / 100) == 0) {
System.out.println("Flushing write #" + k);
@ -609,13 +612,16 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
fs.delete(oldRegion2);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
/**
* {@inheritDoc}
*/
public void closing(@SuppressWarnings("unused") final Text regionName) {
// We don't use this here. It is only for the HRegionServer
}
/**
* {@inheritDoc}
*/
public void closed(@SuppressWarnings("unused") final Text regionName) {
// We don't use this here. It is only for the HRegionServer
}
@ -633,7 +639,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
long startTime = System.currentTimeMillis();
HInternalScannerInterface s = region.getScanner(cols, new Text());
HInternalScannerInterface s =
region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
try {
@ -689,7 +696,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text());
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
try {
int numFetched = 0;
HStoreKey curKey = new HStoreKey();
@ -726,7 +733,9 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
if(StaticTestEnvironment.debugging) {
startTime = System.currentTimeMillis();
s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text());
s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text(),
System.currentTimeMillis(), null);
try {
int numFetched = 0;
HStoreKey curKey = new HStoreKey();
@ -762,7 +771,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text());
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
try {
int fetched = 0;

View File

@ -78,7 +78,9 @@ public class TestScanner extends HBaseTestCase {
for(int i = 0; i < scanColumns.length; i++) {
try {
scanner = region.getScanner(scanColumns[i], FIRST_ROW);
scanner = region.getScanner(scanColumns[i], FIRST_ROW,
System.currentTimeMillis(), null);
while(scanner.next(key, results)) {
assertTrue(results.containsKey(HConstants.COL_REGIONINFO));
byte [] val = results.get(HConstants.COL_REGIONINFO);
@ -155,7 +157,7 @@ public class TestScanner extends HBaseTestCase {
DataOutputStream s = new DataOutputStream(byteStream);
HGlobals.rootRegionInfo.write(s);
region.put(lockid, HConstants.COL_REGIONINFO, byteStream.toByteArray());
region.commit(lockid);
region.commit(lockid, System.currentTimeMillis());
// What we just committed is in the memcache. Verify that we can get
// it back both with scanning and get
@ -186,7 +188,7 @@ public class TestScanner extends HBaseTestCase {
region.put(lockid, HConstants.COL_STARTCODE,
String.valueOf(START_CODE).getBytes(HConstants.UTF8_ENCODING));
region.commit(lockid);
region.commit(lockid, System.currentTimeMillis());
// Validate that we can still get the HRegionInfo, even though it is in
// an older row on disk and there is a newer row in the memcache
@ -223,7 +225,7 @@ public class TestScanner extends HBaseTestCase {
region.put(lockid, HConstants.COL_SERVER,
address.toString().getBytes(HConstants.UTF8_ENCODING));
region.commit(lockid);
region.commit(lockid, System.currentTimeMillis());
// Validate again

View File

@ -81,7 +81,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
HClient.RegionLocation rl = client.getRegionLocation(table);
regionServer = client.getHRegionConnection(rl.serverAddress);
scannerId = regionServer.openScanner(rl.regionInfo.regionName,
HMaster.METACOLUMNS, new Text());
HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null);
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
KeyedData[] values = regionServer.next(scannerId);

View File

@ -66,6 +66,9 @@ public class TestTableMapReduce extends HBaseTestCase {
"6789".getBytes()
};
/**
* {@inheritDoc}
*/
@Override
public void setUp() throws Exception {
super.setUp();
@ -96,7 +99,7 @@ public class TestTableMapReduce extends HBaseTestCase {
+ String.format("%1$05d", i)));
region.put(lockid, TEXT_INPUT_COLUMN, values[i]);
region.commit(lockid);
region.commit(lockid, System.currentTimeMillis());
}
region.close();
@ -117,6 +120,9 @@ public class TestTableMapReduce extends HBaseTestCase {
}
}
/**
* {@inheritDoc}
*/
@Override
public void tearDown() throws Exception {
super.tearDown();

View File

@ -0,0 +1,175 @@
package org.apache.hadoop.hbase;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
/** Tests user specifyable time stamps */
public class TestTimestamp extends HBaseClusterTestCase {
private static final long T0 = 10L;
private static final long T1 = 100L;
private static final String COLUMN_NAME = "contents:";
private static final String TABLE_NAME = "test";
private static final String VERSION1 = "version1";
private static final String LATEST = "latest";
private static final Text COLUMN = new Text(COLUMN_NAME);
private static final Text[] COLUMNS = {
COLUMN
};
private static final Text TABLE = new Text(TABLE_NAME);
private static final Text ROW = new Text("row");
private HClient client;
/** constructor */
public TestTimestamp() {
super();
client = new HClient(conf);
}
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
super.setUp();
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
try {
client.createTable(desc);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
/** the test */
public void testTimestamp() {
try {
client.openTable(TABLE);
// store a value specifying an update time
long lockid = client.startUpdate(ROW);
client.put(lockid, COLUMN, VERSION1.getBytes(HConstants.UTF8_ENCODING));
client.commit(lockid, T0);
// store a value specifying 'now' as the update time
lockid = client.startUpdate(ROW);
client.put(lockid, COLUMN, LATEST.getBytes(HConstants.UTF8_ENCODING));
client.commit(lockid);
// delete values older than T1
lockid = client.startUpdate(ROW);
client.delete(lockid, COLUMN);
client.commit(lockid, T1);
// now retrieve...
// the most recent version:
byte[] bytes = client.get(ROW, COLUMN);
assertTrue(bytes != null && bytes.length != 0);
assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
// any version <= time T1
byte[][] values = client.get(ROW, COLUMN, T1, 3);
assertNull(values);
// the version from T0
values = client.get(ROW, COLUMN, T0, 3);
assertTrue(values.length == 1
&& VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// flush everything out to disk
HRegionServer s = cluster.regionServers.get(0);
for(HRegion r: s.onlineRegions.values() ) {
r.flushcache(false);
}
// now retrieve...
// the most recent version:
bytes = client.get(ROW, COLUMN);
assertTrue(bytes != null && bytes.length != 0);
assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
// any version <= time T1
values = client.get(ROW, COLUMN, T1, 3);
assertNull(values);
// the version from T0
values = client.get(ROW, COLUMN, T0, 3);
assertTrue(values.length == 1
&& VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// three versions older than now
values = client.get(ROW, COLUMN, 3);
assertTrue(values.length == 1
&& LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// Test scanners
HScannerInterface scanner =
client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, 1);
assertEquals(results.size(), 1);
} finally {
scanner.close();
}
scanner = client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW, T1);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, 0);
assertEquals(results.size(), 0);
} finally {
scanner.close();
}
scanner = client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW, T0);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, 0);
assertEquals(results.size(), 0);
} finally {
scanner.close();
}
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
}