HBASE-798 Provide Client API to explicitly lock and unlock rows (Jonathan Gray via Jim Kellerman)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@685391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
daccae3d26
commit
e1ab934e5d
|
@ -32,6 +32,8 @@ Release 0.3.0 - Unreleased
|
|||
|
||||
NEW FEATURES
|
||||
HBASE-787 Postgresql to HBase table replication example (Tim Sell via Stack)
|
||||
HBASE-798 Provide Client API to explicitly lock and unlock rows (Jonathan
|
||||
Gray via Jim Kellerman)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -373,7 +373,7 @@ class HMerge implements HConstants {
|
|||
b.delete(COL_STARTCODE);
|
||||
b.delete(COL_SPLITA);
|
||||
b.delete(COL_SPLITB);
|
||||
root.batchUpdate(b);
|
||||
root.batchUpdate(b,null);
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: " + regionsToDelete[r]);
|
||||
|
@ -383,7 +383,7 @@ class HMerge implements HConstants {
|
|||
newInfo.setOffline(true);
|
||||
BatchUpdate b = new BatchUpdate(newRegion.getRegionName());
|
||||
b.put(COL_REGIONINFO, Writables.getBytes(newInfo));
|
||||
root.batchUpdate(b);
|
||||
root.batchUpdate(b,null);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: " + newRegion.getRegionName());
|
||||
}
|
||||
|
|
|
@ -639,12 +639,33 @@ public class HTable {
|
|||
*/
|
||||
public RowResult getRow(final byte [] row, final byte [][] columns,
|
||||
final long ts)
|
||||
throws IOException {
|
||||
return getRow(row,columns,ts,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get selected columns for the specified row at a specified timestamp
|
||||
* using existing row lock.
|
||||
*
|
||||
* @param row row key
|
||||
* @param columns Array of column names and families you want to retrieve.
|
||||
* @param ts timestamp
|
||||
* @param rl row lock
|
||||
* @return RowResult is empty if row does not exist.
|
||||
* @throws IOException
|
||||
*/
|
||||
public RowResult getRow(final byte [] row, final byte [][] columns,
|
||||
final long ts, final RowLock rl)
|
||||
throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<RowResult>(connection, tableName, row) {
|
||||
public RowResult call() throws IOException {
|
||||
long lockId = -1L;
|
||||
if(rl != null) {
|
||||
lockId = rl.getLockId();
|
||||
}
|
||||
return server.getRow(location.getRegionInfo().getRegionName(), row,
|
||||
columns, ts);
|
||||
columns, ts, lockId);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -1103,16 +1124,36 @@ public class HTable {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void deleteAll(final byte [] row, final byte [] column, final long ts)
|
||||
throws IOException {
|
||||
deleteAll(row,column,ts,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all cells that match the passed row and column and whose
|
||||
* timestamp is equal-to or older than the passed timestamp, using an
|
||||
* existing row lock.
|
||||
* @param row Row to update
|
||||
* @param column name of column whose value is to be deleted
|
||||
* @param ts Delete all cells of the same timestamp or older.
|
||||
* @param rl Existing row lock
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteAll(final byte [] row, final byte [] column, final long ts,
|
||||
final RowLock rl)
|
||||
throws IOException {
|
||||
connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, row) {
|
||||
public Boolean call() throws IOException {
|
||||
long lockId = -1L;
|
||||
if(rl != null) {
|
||||
lockId = rl.getLockId();
|
||||
}
|
||||
if (column != null) {
|
||||
this.server.deleteAll(location.getRegionInfo().getRegionName(),
|
||||
row, column, ts);
|
||||
row, column, ts, lockId);
|
||||
} else {
|
||||
this.server.deleteAll(location.getRegionInfo().getRegionName(),
|
||||
row, ts);
|
||||
row, ts, lockId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -1160,12 +1201,32 @@ public class HTable {
|
|||
*/
|
||||
public void deleteFamily(final byte [] row, final byte [] family,
|
||||
final long timestamp)
|
||||
throws IOException {
|
||||
deleteFamily(row,family,timestamp,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all cells for a row with matching column family with timestamps
|
||||
* less than or equal to <i>timestamp</i>, using existing row lock.
|
||||
*
|
||||
* @param row The row to operate on
|
||||
* @param family The column family to match
|
||||
* @param timestamp Timestamp to match
|
||||
* @param rl Existing row lock
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteFamily(final byte [] row, final byte [] family,
|
||||
final long timestamp, final RowLock rl)
|
||||
throws IOException {
|
||||
connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, row) {
|
||||
public Boolean call() throws IOException {
|
||||
long lockId = -1L;
|
||||
if(rl != null) {
|
||||
lockId = rl.getLockId();
|
||||
}
|
||||
server.deleteFamily(location.getRegionInfo().getRegionName(), row,
|
||||
family, timestamp);
|
||||
family, timestamp, lockId);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -1178,12 +1239,28 @@ public class HTable {
|
|||
* @throws IOException
|
||||
*/
|
||||
public synchronized void commit(final BatchUpdate batchUpdate)
|
||||
throws IOException {
|
||||
commit(batchUpdate,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit a BatchUpdate to the table using existing row lock.
|
||||
* @param batchUpdate
|
||||
* @param rl Existing row lock
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void commit(final BatchUpdate batchUpdate,
|
||||
final RowLock rl)
|
||||
throws IOException {
|
||||
connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
|
||||
public Boolean call() throws IOException {
|
||||
long lockId = -1L;
|
||||
if(rl != null) {
|
||||
lockId = rl.getLockId();
|
||||
}
|
||||
server.batchUpdate(location.getRegionInfo().getRegionName(),
|
||||
batchUpdate);
|
||||
batchUpdate, lockId);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -1198,7 +1275,45 @@ public class HTable {
|
|||
public synchronized void commit(final List<BatchUpdate> batchUpdates)
|
||||
throws IOException {
|
||||
for (BatchUpdate batchUpdate : batchUpdates)
|
||||
commit(batchUpdate);
|
||||
commit(batchUpdate,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a row lock
|
||||
* @param row The row to lock
|
||||
* @return rowLock RowLock containing row and lock id
|
||||
* @throws IOException
|
||||
*/
|
||||
public RowLock lockRow(final byte [] row)
|
||||
throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<RowLock>(connection, tableName, row) {
|
||||
public RowLock call() throws IOException {
|
||||
long lockId =
|
||||
server.lockRow(location.getRegionInfo().getRegionName(), row);
|
||||
RowLock rowLock = new RowLock(row,lockId);
|
||||
return rowLock;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Release a row lock
|
||||
* @param rl The row lock to release
|
||||
* @throws IOException
|
||||
*/
|
||||
public void unlockRow(final RowLock rl)
|
||||
throws IOException {
|
||||
connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, rl.getRow()) {
|
||||
public Boolean call() throws IOException {
|
||||
server.unlockRow(location.getRegionInfo().getRegionName(),
|
||||
rl.getLockId());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -111,11 +111,12 @@ public interface HRegionInterface extends VersionedProtocol {
|
|||
*
|
||||
* @param regionName region name
|
||||
* @param row row key
|
||||
* @param lockId lock id
|
||||
* @return map of values
|
||||
* @throws IOException
|
||||
*/
|
||||
public RowResult getRow(final byte [] regionName, final byte [] row,
|
||||
final byte[][] columns, final long ts)
|
||||
final byte[][] columns, final long ts, final long lockId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -123,9 +124,11 @@ public interface HRegionInterface extends VersionedProtocol {
|
|||
*
|
||||
* @param regionName name of the region to update
|
||||
* @param b BatchUpdate
|
||||
* @param lockId lock id
|
||||
* @throws IOException
|
||||
*/
|
||||
public void batchUpdate(final byte [] regionName, final BatchUpdate b)
|
||||
public void batchUpdate(final byte [] regionName, final BatchUpdate b,
|
||||
final long lockId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -136,10 +139,11 @@ public interface HRegionInterface extends VersionedProtocol {
|
|||
* @param row row key
|
||||
* @param column column key
|
||||
* @param timestamp Delete all entries that have this timestamp or older
|
||||
* @param lockId lock id
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteAll(byte [] regionName, byte [] row, byte [] column,
|
||||
long timestamp)
|
||||
long timestamp, long lockId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -149,9 +153,11 @@ public interface HRegionInterface extends VersionedProtocol {
|
|||
* @param regionName region name
|
||||
* @param row row key
|
||||
* @param timestamp Delete all entries that have this timestamp or older
|
||||
* @param lockId lock id
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteAll(byte [] regionName, byte [] row, long timestamp)
|
||||
public void deleteAll(byte [] regionName, byte [] row, long timestamp,
|
||||
long lockId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -162,9 +168,10 @@ public interface HRegionInterface extends VersionedProtocol {
|
|||
* @param row The row to operate on
|
||||
* @param family The column family to match
|
||||
* @param timestamp Timestamp to match
|
||||
* @param lockId lock id
|
||||
*/
|
||||
public void deleteFamily(byte [] regionName, byte [] row, byte [] family,
|
||||
long timestamp)
|
||||
long timestamp, long lockId)
|
||||
throws IOException;
|
||||
|
||||
|
||||
|
@ -207,4 +214,24 @@ public interface HRegionInterface extends VersionedProtocol {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void close(long scannerId) throws IOException;
|
||||
|
||||
/**
|
||||
* Opens a remote row lock.
|
||||
*
|
||||
* @param regionName name of region
|
||||
* @param row row to lock
|
||||
* @return lockId lock identifier
|
||||
* @throws IOException
|
||||
*/
|
||||
public long lockRow(final byte [] regionName, final byte [] row)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Releases a remote row lock.
|
||||
*
|
||||
* @param lockId the lock id returned by lockRow
|
||||
* @throws IOException
|
||||
*/
|
||||
public void unlockRow(final byte [] regionName, final long lockId)
|
||||
throws IOException;
|
||||
}
|
|
@ -332,7 +332,7 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
|
||||
BatchUpdate b = new BatchUpdate(parent);
|
||||
b.delete(splitColumn);
|
||||
srvr.batchUpdate(metaRegionName, b);
|
||||
srvr.batchUpdate(metaRegionName, b, -1L);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ class ChangeTableState extends TableOperation {
|
|||
updateRegionInfo(b, i);
|
||||
b.delete(COL_SERVER);
|
||||
b.delete(COL_STARTCODE);
|
||||
server.batchUpdate(m.getRegionName(), b);
|
||||
server.batchUpdate(m.getRegionName(), b, -1L);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: " + i.getRegionNameAsString());
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ abstract class ColumnOperation extends TableOperation {
|
|||
throws IOException {
|
||||
BatchUpdate b = new BatchUpdate(i.getRegionName());
|
||||
b.put(COL_REGIONINFO, Writables.getBytes(i));
|
||||
server.batchUpdate(regionName, b);
|
||||
server.batchUpdate(regionName, b, -1L);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: " + i.getRegionNameAsString());
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ class ModifyTableMeta extends TableOperation {
|
|||
throws IOException {
|
||||
BatchUpdate b = new BatchUpdate(i.getRegionName());
|
||||
b.put(COL_REGIONINFO, Writables.getBytes(i));
|
||||
server.batchUpdate(regionName, b);
|
||||
server.batchUpdate(regionName, b, -1L);
|
||||
LOG.debug("updated HTableDescriptor for region " + i.getRegionNameAsString());
|
||||
}
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
BatchUpdate b = new BatchUpdate(regionInfo.getRegionName());
|
||||
b.put(COL_SERVER, Bytes.toBytes(serverAddress.toString()));
|
||||
b.put(COL_STARTCODE, startCode);
|
||||
server.batchUpdate(metaRegionName, b);
|
||||
server.batchUpdate(metaRegionName, b, -1L);
|
||||
if (!this.historian.isOnline()) {
|
||||
// This is safest place to do the onlining of the historian in
|
||||
// the master. When we get to here, we know there is a .META.
|
||||
|
|
|
@ -560,7 +560,7 @@ class RegionManager implements HConstants {
|
|||
byte [] regionName = region.getRegionName();
|
||||
BatchUpdate b = new BatchUpdate(regionName);
|
||||
b.put(COL_REGIONINFO, Writables.getBytes(info));
|
||||
server.batchUpdate(metaRegionName, b);
|
||||
server.batchUpdate(metaRegionName, b, -1L);
|
||||
|
||||
// 4. Close the new region to flush it to disk. Close its log file too.
|
||||
region.close();
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
|
@ -1170,7 +1170,7 @@ public class HRegion implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
public Map<byte [], Cell> getFull(final byte [] row,
|
||||
final Set<byte []> columns, final long ts)
|
||||
final Set<byte []> columns, final long ts, final Integer lockid)
|
||||
throws IOException {
|
||||
// Check columns passed
|
||||
if (columns != null) {
|
||||
|
@ -1179,7 +1179,7 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
HStoreKey key = new HStoreKey(row, ts);
|
||||
Integer lid = obtainRowLock(row);
|
||||
Integer lid = getLock(lockid,row);
|
||||
HashSet<HStore> storeSet = new HashSet<HStore>();
|
||||
try {
|
||||
TreeMap<byte [], Cell> result =
|
||||
|
@ -1215,7 +1215,7 @@ public class HRegion implements HConstants {
|
|||
|
||||
return result;
|
||||
} finally {
|
||||
releaseRowLock(lid);
|
||||
if(lockid == null) releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1347,7 +1347,7 @@ public class HRegion implements HConstants {
|
|||
* @param b
|
||||
* @throws IOException
|
||||
*/
|
||||
public void batchUpdate(BatchUpdate b)
|
||||
public void batchUpdate(BatchUpdate b, Integer lockid)
|
||||
throws IOException {
|
||||
checkReadOnly();
|
||||
|
||||
|
@ -1363,7 +1363,8 @@ public class HRegion implements HConstants {
|
|||
// See HRegionServer#RegionListener for how the expire on HRegionServer
|
||||
// invokes a HRegion#abort.
|
||||
byte [] row = b.getRow();
|
||||
Integer lid = obtainRowLock(row);
|
||||
// If we did not pass an existing row lock, obtain a new one
|
||||
Integer lid = getLock(lockid,row);
|
||||
long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
|
||||
System.currentTimeMillis() : b.getTimestamp();
|
||||
try {
|
||||
|
@ -1408,7 +1409,7 @@ public class HRegion implements HConstants {
|
|||
this.targetColumns.remove(Long.valueOf(lid));
|
||||
throw e;
|
||||
} finally {
|
||||
releaseRowLock(lid);
|
||||
if(lockid == null) releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1458,17 +1459,19 @@ public class HRegion implements HConstants {
|
|||
* @param row
|
||||
* @param column
|
||||
* @param ts Delete all entries that have this timestamp or older
|
||||
* @param lockid Row lock
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteAll(final byte [] row, final byte [] column, final long ts)
|
||||
public void deleteAll(final byte [] row, final byte [] column, final long ts,
|
||||
final Integer lockid)
|
||||
throws IOException {
|
||||
checkColumn(column);
|
||||
checkReadOnly();
|
||||
Integer lid = obtainRowLock(row);
|
||||
Integer lid = getLock(lockid,row);
|
||||
try {
|
||||
deleteMultiple(row, column, ts, ALL_VERSIONS);
|
||||
} finally {
|
||||
releaseRowLock(lid);
|
||||
if(lockid == null) releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1476,12 +1479,14 @@ public class HRegion implements HConstants {
|
|||
* Delete all cells of the same age as the passed timestamp or older.
|
||||
* @param row
|
||||
* @param ts Delete all entries that have this timestamp or older
|
||||
* @param lockid Row lock
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteAll(final byte [] row, final long ts)
|
||||
public void deleteAll(final byte [] row, final long ts,
|
||||
final Integer lockid)
|
||||
throws IOException {
|
||||
checkReadOnly();
|
||||
Integer lid = obtainRowLock(row);
|
||||
Integer lid = getLock(lockid,row);
|
||||
try {
|
||||
for (HStore store : stores.values()){
|
||||
List<HStoreKey> keys = store.getKeys(new HStoreKey(row, ts),
|
||||
|
@ -1493,7 +1498,7 @@ public class HRegion implements HConstants {
|
|||
update(edits);
|
||||
}
|
||||
} finally {
|
||||
releaseRowLock(lid);
|
||||
if(lockid == null) releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1504,12 +1509,14 @@ public class HRegion implements HConstants {
|
|||
* @param row The row to operate on
|
||||
* @param family The column family to match
|
||||
* @param timestamp Timestamp to match
|
||||
* @param lockid Row lock
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteFamily(byte [] row, byte [] family, long timestamp)
|
||||
public void deleteFamily(byte [] row, byte [] family, long timestamp,
|
||||
final Integer lockid)
|
||||
throws IOException{
|
||||
checkReadOnly();
|
||||
Integer lid = obtainRowLock(row);
|
||||
Integer lid = getLock(lockid,row);
|
||||
try {
|
||||
// find the HStore for the column family
|
||||
HStore store = getStore(family);
|
||||
|
@ -1522,7 +1529,7 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
update(edits);
|
||||
} finally {
|
||||
releaseRowLock(lid);
|
||||
if(lockid == null) releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1778,6 +1785,41 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* See if row is currently locked.
|
||||
* @param lockid
|
||||
* @return boolean
|
||||
*/
|
||||
private boolean isRowLocked(final Integer lockid) {
|
||||
synchronized (locksToRows) {
|
||||
if(locksToRows.containsKey(lockid)) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns existing row lock if found, otherwise
|
||||
* obtains a new row lock and returns it.
|
||||
* @param lockid
|
||||
* @return lockid
|
||||
*/
|
||||
private Integer getLock(Integer lockid, byte [] row)
|
||||
throws IOException {
|
||||
Integer lid = null;
|
||||
if(lockid == null) {
|
||||
lid = obtainRowLock(row);
|
||||
} else {
|
||||
if(!isRowLocked(lockid)) {
|
||||
throw new IOException("Invalid row lock");
|
||||
}
|
||||
lid = lockid;
|
||||
}
|
||||
return lid;
|
||||
}
|
||||
|
||||
private void waitOnRowLocks() {
|
||||
synchronized (locksToRows) {
|
||||
while (this.locksToRows.size() > 0) {
|
||||
|
@ -2134,7 +2176,8 @@ public class HRegion implements HConstants {
|
|||
public static void removeRegionFromMETA(final HRegionInterface srvr,
|
||||
final byte [] metaRegionName, final byte [] regionName)
|
||||
throws IOException {
|
||||
srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
|
||||
srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP,
|
||||
(long)-1L);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2155,7 +2198,7 @@ public class HRegion implements HConstants {
|
|||
b.delete(COL_STARTCODE);
|
||||
// If carrying splits, they'll be in place when we show up on new
|
||||
// server.
|
||||
srvr.batchUpdate(metaRegionName, b);
|
||||
srvr.batchUpdate(metaRegionName, b, (long)-1L);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
|||
import org.apache.hadoop.hbase.RegionHistorian;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.UnknownRowLockException;
|
||||
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.BatchOperation;
|
||||
|
@ -1048,7 +1049,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
|
||||
/** {@inheritDoc} */
|
||||
public RowResult getRow(final byte [] regionName, final byte [] row,
|
||||
final byte [][] columns, final long ts)
|
||||
final byte [][] columns, final long ts, final long lockId)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
requestCount.incrementAndGet();
|
||||
|
@ -1061,7 +1062,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
|
||||
HRegion region = getRegion(regionName);
|
||||
Map<byte [], Cell> map = region.getFull(row, columnSet, ts);
|
||||
Map<byte [], Cell> map = region.getFull(row, columnSet, ts,
|
||||
getLockFromId(lockId));
|
||||
HbaseMapWritable<byte [], Cell> result =
|
||||
new HbaseMapWritable<byte [], Cell>();
|
||||
result.putAll(map);
|
||||
|
@ -1126,7 +1128,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void batchUpdate(final byte [] regionName, BatchUpdate b)
|
||||
public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
this.requestCount.incrementAndGet();
|
||||
|
@ -1134,7 +1136,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
validateValuesLength(b, region);
|
||||
try {
|
||||
cacheFlusher.reclaimMemcacheMemory();
|
||||
region.batchUpdate(b);
|
||||
region.batchUpdate(b, getLockFromId(lockId));
|
||||
} catch (OutOfMemoryError error) {
|
||||
abort();
|
||||
LOG.fatal("Ran out of memory", error);
|
||||
|
@ -1239,7 +1241,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
|
||||
Map<String, InternalScanner> scanners =
|
||||
Collections.synchronizedMap(new HashMap<String, InternalScanner>());
|
||||
new ConcurrentHashMap<String, InternalScanner>();
|
||||
|
||||
/**
|
||||
* Instantiated as a scanner lease.
|
||||
|
@ -1275,26 +1277,157 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
|
||||
/** {@inheritDoc} */
|
||||
public void deleteAll(final byte [] regionName, final byte [] row,
|
||||
final byte [] column, final long timestamp)
|
||||
final byte [] column, final long timestamp, final long lockId)
|
||||
throws IOException {
|
||||
HRegion region = getRegion(regionName);
|
||||
region.deleteAll(row, column, timestamp);
|
||||
region.deleteAll(row, column, timestamp, getLockFromId(lockId));
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void deleteAll(final byte [] regionName, final byte [] row,
|
||||
final long timestamp)
|
||||
final long timestamp, final long lockId)
|
||||
throws IOException {
|
||||
HRegion region = getRegion(regionName);
|
||||
region.deleteAll(row, timestamp);
|
||||
region.deleteAll(row, timestamp, getLockFromId(lockId));
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void deleteFamily(byte [] regionName, byte [] row, byte [] family,
|
||||
long timestamp) throws IOException{
|
||||
getRegion(regionName).deleteFamily(row, family, timestamp);
|
||||
long timestamp, final long lockId)
|
||||
throws IOException{
|
||||
getRegion(regionName).deleteFamily(row, family, timestamp,
|
||||
getLockFromId(lockId));
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public long lockRow(byte [] regionName, byte [] row)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
NullPointerException npe = null;
|
||||
if(regionName == null) {
|
||||
npe = new NullPointerException("regionName is null");
|
||||
} else if(row == null) {
|
||||
npe = new NullPointerException("row to lock is null");
|
||||
}
|
||||
if(npe != null) {
|
||||
IOException io = new IOException("Invalid arguments to lockRow");
|
||||
io.initCause(npe);
|
||||
throw io;
|
||||
}
|
||||
requestCount.incrementAndGet();
|
||||
try {
|
||||
HRegion region = getRegion(regionName);
|
||||
Integer r = region.obtainRowLock(row);
|
||||
long lockId = addRowLock(r,region);
|
||||
LOG.debug("Row lock " + lockId + " explicitly acquired by client");
|
||||
return lockId;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error obtaining row lock (fsOk: " + this.fsOk + ")",
|
||||
RemoteExceptionHandler.checkIOException(e));
|
||||
checkFileSystem();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException {
|
||||
long lockId = -1L;
|
||||
lockId = rand.nextLong();
|
||||
String lockName = String.valueOf(lockId);
|
||||
synchronized(rowlocks) {
|
||||
rowlocks.put(lockName, r);
|
||||
}
|
||||
this.leases.
|
||||
createLease(lockName, new RowLockListener(lockName, region));
|
||||
return lockId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to get the Integer lock identifier used internally
|
||||
* from the long lock identifier used by the client.
|
||||
* @param lockId long row lock identifier from client
|
||||
* @return intId Integer row lock used internally in HRegion
|
||||
* @throws IOException Thrown if this is not a valid client lock id.
|
||||
*/
|
||||
private Integer getLockFromId(long lockId)
|
||||
throws IOException {
|
||||
if(lockId == -1L) {
|
||||
return null;
|
||||
}
|
||||
String lockName = String.valueOf(lockId);
|
||||
Integer rl = null;
|
||||
synchronized(rowlocks) {
|
||||
rl = rowlocks.get(lockName);
|
||||
}
|
||||
if(rl == null) {
|
||||
throw new IOException("Invalid row lock");
|
||||
}
|
||||
this.leases.renewLease(lockName);
|
||||
return rl;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void unlockRow(byte [] regionName, long lockId)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
NullPointerException npe = null;
|
||||
if(regionName == null) {
|
||||
npe = new NullPointerException("regionName is null");
|
||||
} else if(lockId == -1L) {
|
||||
npe = new NullPointerException("lockId is null");
|
||||
}
|
||||
if(npe != null) {
|
||||
IOException io = new IOException("Invalid arguments to unlockRow");
|
||||
io.initCause(npe);
|
||||
throw io;
|
||||
}
|
||||
requestCount.incrementAndGet();
|
||||
try {
|
||||
HRegion region = getRegion(regionName);
|
||||
String lockName = String.valueOf(lockId);
|
||||
Integer r = null;
|
||||
synchronized(rowlocks) {
|
||||
r = rowlocks.remove(lockName);
|
||||
}
|
||||
if(r == null) {
|
||||
throw new UnknownRowLockException(lockName);
|
||||
}
|
||||
region.releaseRowLock(r);
|
||||
this.leases.cancelLease(lockName);
|
||||
LOG.debug("Row lock " + lockId + " has been explicitly released by client");
|
||||
} catch (IOException e) {
|
||||
checkFileSystem();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Integer> rowlocks =
|
||||
new ConcurrentHashMap<String, Integer>();
|
||||
|
||||
/**
|
||||
* Instantiated as a row lock lease.
|
||||
* If the lease times out, the row lock is released
|
||||
*/
|
||||
private class RowLockListener implements LeaseListener {
|
||||
private final String lockName;
|
||||
private final HRegion region;
|
||||
|
||||
RowLockListener(final String lockName, final HRegion region) {
|
||||
this.lockName = lockName;
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void leaseExpired() {
|
||||
LOG.info("Row Lock " + this.lockName + " lease expired");
|
||||
Integer r = null;
|
||||
synchronized(rowlocks) {
|
||||
r = rowlocks.remove(this.lockName);
|
||||
}
|
||||
if(r != null) {
|
||||
region.releaseRowLock(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Info on this server.
|
||||
|
|
|
@ -308,7 +308,7 @@ public class Merge extends Configured implements Tool {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing region: " + regioninfo + " from " + meta);
|
||||
}
|
||||
meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis());
|
||||
meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis(), null);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -407,7 +407,7 @@ public class MetaUtils {
|
|||
}
|
||||
BatchUpdate b = new BatchUpdate(hri.getRegionName());
|
||||
b.put(HConstants.COL_REGIONINFO, Writables.getBytes(hri));
|
||||
r.batchUpdate(b);
|
||||
r.batchUpdate(b, null);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
HRegionInfo h = Writables.getHRegionInfoOrNull(
|
||||
r.get(hri.getRegionName(), HConstants.COL_REGIONINFO).getValue());
|
||||
|
|
|
@ -2050,7 +2050,8 @@ public class HRegion implements HConstants {
|
|||
public static void removeRegionFromMETA(final HRegionInterface srvr,
|
||||
final byte [] metaRegionName, final byte [] regionName)
|
||||
throws IOException {
|
||||
srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
|
||||
srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP,
|
||||
-1L);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2071,7 +2072,7 @@ public class HRegion implements HConstants {
|
|||
b.delete(COL_STARTCODE);
|
||||
// If carrying splits, they'll be in place when we show up on new
|
||||
// server.
|
||||
srvr.batchUpdate(metaRegionName, b);
|
||||
srvr.batchUpdate(metaRegionName, b, -1L);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue