diff --git a/CHANGES.txt b/CHANGES.txt index becddd4e9cf..216aac8c4fb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -32,6 +32,8 @@ Release 0.3.0 - Unreleased NEW FEATURES HBASE-787 Postgresql to HBase table replication example (Tim Sell via Stack) + HBASE-798 Provide Client API to explicitly lock and unlock rows (Jonathan + Gray via Jim Kellerman) OPTIMIZATIONS diff --git a/src/java/org/apache/hadoop/hbase/HMerge.java b/src/java/org/apache/hadoop/hbase/HMerge.java index f43b35e1fc5..f87b6c7d577 100644 --- a/src/java/org/apache/hadoop/hbase/HMerge.java +++ b/src/java/org/apache/hadoop/hbase/HMerge.java @@ -373,7 +373,7 @@ class HMerge implements HConstants { b.delete(COL_STARTCODE); b.delete(COL_SPLITA); b.delete(COL_SPLITB); - root.batchUpdate(b); + root.batchUpdate(b,null); if(LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + regionsToDelete[r]); @@ -383,7 +383,7 @@ class HMerge implements HConstants { newInfo.setOffline(true); BatchUpdate b = new BatchUpdate(newRegion.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(newInfo)); - root.batchUpdate(b); + root.batchUpdate(b,null); if(LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + newRegion.getRegionName()); } diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index 6d086e91fc4..676da7b6b35 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -639,12 +639,33 @@ public class HTable { */ public RowResult getRow(final byte [] row, final byte [][] columns, final long ts) + throws IOException { + return getRow(row,columns,ts,null); + } + + /** + * Get selected columns for the specified row at a specified timestamp + * using existing row lock. + * + * @param row row key + * @param columns Array of column names and families you want to retrieve. + * @param ts timestamp + * @param rl row lock + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final byte [] row, final byte [][] columns, + final long ts, final RowLock rl) throws IOException { return connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public RowResult call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } return server.getRow(location.getRegionInfo().getRegionName(), row, - columns, ts); + columns, ts, lockId); } } ); @@ -1103,16 +1124,36 @@ public class HTable { * @throws IOException */ public void deleteAll(final byte [] row, final byte [] column, final long ts) + throws IOException { + deleteAll(row,column,ts,null); + } + + /** + * Delete all cells that match the passed row and column and whose + * timestamp is equal-to or older than the passed timestamp, using an + * existing row lock. + * @param row Row to update + * @param column name of column whose value is to be deleted + * @param ts Delete all cells of the same timestamp or older. + * @param rl Existing row lock + * @throws IOException + */ + public void deleteAll(final byte [] row, final byte [] column, final long ts, + final RowLock rl) throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } if (column != null) { this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, column, ts); + row, column, ts, lockId); } else { this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, ts); + row, ts, lockId); } return null; } @@ -1160,12 +1201,32 @@ public class HTable { */ public void deleteFamily(final byte [] row, final byte [] family, final long timestamp) + throws IOException { + deleteFamily(row,family,timestamp,null); + } + + /** + * Delete all cells for a row with matching column family with timestamps + * less than or equal to timestamp, using existing row lock. + * + * @param row The row to operate on + * @param family The column family to match + * @param timestamp Timestamp to match + * @param rl Existing row lock + * @throws IOException + */ + public void deleteFamily(final byte [] row, final byte [] family, + final long timestamp, final RowLock rl) throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } server.deleteFamily(location.getRegionInfo().getRegionName(), row, - family, timestamp); + family, timestamp, lockId); return null; } } @@ -1178,12 +1239,28 @@ public class HTable { * @throws IOException */ public synchronized void commit(final BatchUpdate batchUpdate) + throws IOException { + commit(batchUpdate,null); + } + + /** + * Commit a BatchUpdate to the table using existing row lock. + * @param batchUpdate + * @param rl Existing row lock + * @throws IOException + */ + public synchronized void commit(final BatchUpdate batchUpdate, + final RowLock rl) throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, batchUpdate.getRow()) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } server.batchUpdate(location.getRegionInfo().getRegionName(), - batchUpdate); + batchUpdate, lockId); return null; } } @@ -1198,7 +1275,45 @@ public class HTable { public synchronized void commit(final List batchUpdates) throws IOException { for (BatchUpdate batchUpdate : batchUpdates) - commit(batchUpdate); + commit(batchUpdate,null); + } + + /** + * Obtain a row lock + * @param row The row to lock + * @return rowLock RowLock containing row and lock id + * @throws IOException + */ + public RowLock lockRow(final byte [] row) + throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public RowLock call() throws IOException { + long lockId = + server.lockRow(location.getRegionInfo().getRegionName(), row); + RowLock rowLock = new RowLock(row,lockId); + return rowLock; + } + } + ); + } + + /** + * Release a row lock + * @param rl The row lock to release + * @throws IOException + */ + public void unlockRow(final RowLock rl) + throws IOException { + connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, rl.getRow()) { + public Boolean call() throws IOException { + server.unlockRow(location.getRegionInfo().getRegionName(), + rl.getLockId()); + return null; + } + } + ); } /** diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 0aed6af9911..46c6afc3d65 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -111,11 +111,12 @@ public interface HRegionInterface extends VersionedProtocol { * * @param regionName region name * @param row row key + * @param lockId lock id * @return map of values * @throws IOException */ public RowResult getRow(final byte [] regionName, final byte [] row, - final byte[][] columns, final long ts) + final byte[][] columns, final long ts, final long lockId) throws IOException; /** @@ -123,9 +124,11 @@ public interface HRegionInterface extends VersionedProtocol { * * @param regionName name of the region to update * @param b BatchUpdate + * @param lockId lock id * @throws IOException */ - public void batchUpdate(final byte [] regionName, final BatchUpdate b) + public void batchUpdate(final byte [] regionName, final BatchUpdate b, + final long lockId) throws IOException; /** @@ -136,10 +139,11 @@ public interface HRegionInterface extends VersionedProtocol { * @param row row key * @param column column key * @param timestamp Delete all entries that have this timestamp or older + * @param lockId lock id * @throws IOException */ public void deleteAll(byte [] regionName, byte [] row, byte [] column, - long timestamp) + long timestamp, long lockId) throws IOException; /** @@ -149,9 +153,11 @@ public interface HRegionInterface extends VersionedProtocol { * @param regionName region name * @param row row key * @param timestamp Delete all entries that have this timestamp or older + * @param lockId lock id * @throws IOException */ - public void deleteAll(byte [] regionName, byte [] row, long timestamp) + public void deleteAll(byte [] regionName, byte [] row, long timestamp, + long lockId) throws IOException; /** @@ -162,9 +168,10 @@ public interface HRegionInterface extends VersionedProtocol { * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @param lockId lock id */ public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) + long timestamp, long lockId) throws IOException; @@ -207,4 +214,24 @@ public interface HRegionInterface extends VersionedProtocol { * @throws IOException */ public void close(long scannerId) throws IOException; + + /** + * Opens a remote row lock. + * + * @param regionName name of region + * @param row row to lock + * @return lockId lock identifier + * @throws IOException + */ + public long lockRow(final byte [] regionName, final byte [] row) + throws IOException; + + /** + * Releases a remote row lock. + * + * @param lockId the lock id returned by lockRow + * @throws IOException + */ + public void unlockRow(final byte [] regionName, final long lockId) + throws IOException; } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/master/BaseScanner.java b/src/java/org/apache/hadoop/hbase/master/BaseScanner.java index 22d900369a8..5f17ea2b62f 100644 --- a/src/java/org/apache/hadoop/hbase/master/BaseScanner.java +++ b/src/java/org/apache/hadoop/hbase/master/BaseScanner.java @@ -332,7 +332,7 @@ abstract class BaseScanner extends Chore implements HConstants { BatchUpdate b = new BatchUpdate(parent); b.delete(splitColumn); - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, -1L); return result; } diff --git a/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java b/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java index 48670a56caf..4bea278d794 100644 --- a/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java +++ b/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java @@ -91,7 +91,7 @@ class ChangeTableState extends TableOperation { updateRegionInfo(b, i); b.delete(COL_SERVER); b.delete(COL_STARTCODE); - server.batchUpdate(m.getRegionName(), b); + server.batchUpdate(m.getRegionName(), b, -1L); if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.getRegionNameAsString()); } diff --git a/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java b/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java index dae8405949e..94d31ad4691 100644 --- a/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java +++ b/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java @@ -52,7 +52,7 @@ abstract class ColumnOperation extends TableOperation { throws IOException { BatchUpdate b = new BatchUpdate(i.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(i)); - server.batchUpdate(regionName, b); + server.batchUpdate(regionName, b, -1L); if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.getRegionNameAsString()); } diff --git a/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java b/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java index c7a15999d49..ee1b915fdaf 100644 --- a/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java +++ b/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java @@ -52,7 +52,7 @@ class ModifyTableMeta extends TableOperation { throws IOException { BatchUpdate b = new BatchUpdate(i.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(i)); - server.batchUpdate(regionName, b); + server.batchUpdate(regionName, b, -1L); LOG.debug("updated HTableDescriptor for region " + i.getRegionNameAsString()); } diff --git a/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java b/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java index aecced36305..022c03dc729 100644 --- a/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java +++ b/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java @@ -83,7 +83,7 @@ class ProcessRegionOpen extends ProcessRegionStatusChange { BatchUpdate b = new BatchUpdate(regionInfo.getRegionName()); b.put(COL_SERVER, Bytes.toBytes(serverAddress.toString())); b.put(COL_STARTCODE, startCode); - server.batchUpdate(metaRegionName, b); + server.batchUpdate(metaRegionName, b, -1L); if (!this.historian.isOnline()) { // This is safest place to do the onlining of the historian in // the master. When we get to here, we know there is a .META. diff --git a/src/java/org/apache/hadoop/hbase/master/RegionManager.java b/src/java/org/apache/hadoop/hbase/master/RegionManager.java index be76b8b33f6..919d4d33ffe 100644 --- a/src/java/org/apache/hadoop/hbase/master/RegionManager.java +++ b/src/java/org/apache/hadoop/hbase/master/RegionManager.java @@ -560,7 +560,7 @@ class RegionManager implements HConstants { byte [] regionName = region.getRegionName(); BatchUpdate b = new BatchUpdate(regionName); b.put(COL_REGIONINFO, Writables.getBytes(info)); - server.batchUpdate(metaRegionName, b); + server.batchUpdate(metaRegionName, b, -1L); // 4. Close the new region to flush it to disk. Close its log file too. region.close(); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3363317f4a3..a15a7bc68f1 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,4 +1,4 @@ -/** + /** * Copyright 2007 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -1170,7 +1170,7 @@ public class HRegion implements HConstants { * @throws IOException */ public Map getFull(final byte [] row, - final Set columns, final long ts) + final Set columns, final long ts, final Integer lockid) throws IOException { // Check columns passed if (columns != null) { @@ -1179,7 +1179,7 @@ public class HRegion implements HConstants { } } HStoreKey key = new HStoreKey(row, ts); - Integer lid = obtainRowLock(row); + Integer lid = getLock(lockid,row); HashSet storeSet = new HashSet(); try { TreeMap result = @@ -1215,7 +1215,7 @@ public class HRegion implements HConstants { return result; } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1347,7 +1347,7 @@ public class HRegion implements HConstants { * @param b * @throws IOException */ - public void batchUpdate(BatchUpdate b) + public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException { checkReadOnly(); @@ -1363,7 +1363,8 @@ public class HRegion implements HConstants { // See HRegionServer#RegionListener for how the expire on HRegionServer // invokes a HRegion#abort. byte [] row = b.getRow(); - Integer lid = obtainRowLock(row); + // If we did not pass an existing row lock, obtain a new one + Integer lid = getLock(lockid,row); long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp(); try { @@ -1408,7 +1409,7 @@ public class HRegion implements HConstants { this.targetColumns.remove(Long.valueOf(lid)); throw e; } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1458,17 +1459,19 @@ public class HRegion implements HConstants { * @param row * @param column * @param ts Delete all entries that have this timestamp or older + * @param lockid Row lock * @throws IOException */ - public void deleteAll(final byte [] row, final byte [] column, final long ts) + public void deleteAll(final byte [] row, final byte [] column, final long ts, + final Integer lockid) throws IOException { checkColumn(column); checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = getLock(lockid,row); try { deleteMultiple(row, column, ts, ALL_VERSIONS); } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1476,12 +1479,14 @@ public class HRegion implements HConstants { * Delete all cells of the same age as the passed timestamp or older. * @param row * @param ts Delete all entries that have this timestamp or older + * @param lockid Row lock * @throws IOException */ - public void deleteAll(final byte [] row, final long ts) + public void deleteAll(final byte [] row, final long ts, + final Integer lockid) throws IOException { checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = getLock(lockid,row); try { for (HStore store : stores.values()){ List keys = store.getKeys(new HStoreKey(row, ts), @@ -1493,7 +1498,7 @@ public class HRegion implements HConstants { update(edits); } } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1504,12 +1509,14 @@ public class HRegion implements HConstants { * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @param lockid Row lock * @throws IOException */ - public void deleteFamily(byte [] row, byte [] family, long timestamp) + public void deleteFamily(byte [] row, byte [] family, long timestamp, + final Integer lockid) throws IOException{ checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = getLock(lockid,row); try { // find the HStore for the column family HStore store = getStore(family); @@ -1522,7 +1529,7 @@ public class HRegion implements HConstants { } update(edits); } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1552,7 +1559,7 @@ public class HRegion implements HConstants { update(edits); } } - + /** * @throws IOException Throws exception if region is in read-only mode. */ @@ -1778,6 +1785,41 @@ public class HRegion implements HConstants { } } + /** + * See if row is currently locked. + * @param lockid + * @return boolean + */ + private boolean isRowLocked(final Integer lockid) { + synchronized (locksToRows) { + if(locksToRows.containsKey(lockid)) { + return true; + } else { + return false; + } + } + } + + /** + * Returns existing row lock if found, otherwise + * obtains a new row lock and returns it. + * @param lockid + * @return lockid + */ + private Integer getLock(Integer lockid, byte [] row) + throws IOException { + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } + return lid; + } + private void waitOnRowLocks() { synchronized (locksToRows) { while (this.locksToRows.size() > 0) { @@ -2134,7 +2176,8 @@ public class HRegion implements HConstants { public static void removeRegionFromMETA(final HRegionInterface srvr, final byte [] metaRegionName, final byte [] regionName) throws IOException { - srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); + srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP, + (long)-1L); } /** @@ -2155,7 +2198,7 @@ public class HRegion implements HConstants { b.delete(COL_STARTCODE); // If carrying splits, they'll be in place when we show up on new // server. - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, (long)-1L); } /** diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 85be1d8be54..9024e5987ea 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionHistorian; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchOperation; @@ -1048,7 +1049,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { /** {@inheritDoc} */ public RowResult getRow(final byte [] regionName, final byte [] row, - final byte [][] columns, final long ts) + final byte [][] columns, final long ts, final long lockId) throws IOException { checkOpen(); requestCount.incrementAndGet(); @@ -1061,7 +1062,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } HRegion region = getRegion(regionName); - Map map = region.getFull(row, columnSet, ts); + Map map = region.getFull(row, columnSet, ts, + getLockFromId(lockId)); HbaseMapWritable result = new HbaseMapWritable(); result.putAll(map); @@ -1126,7 +1128,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } /** {@inheritDoc} */ - public void batchUpdate(final byte [] regionName, BatchUpdate b) + public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId) throws IOException { checkOpen(); this.requestCount.incrementAndGet(); @@ -1134,7 +1136,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { validateValuesLength(b, region); try { cacheFlusher.reclaimMemcacheMemory(); - region.batchUpdate(b); + region.batchUpdate(b, getLockFromId(lockId)); } catch (OutOfMemoryError error) { abort(); LOG.fatal("Ran out of memory", error); @@ -1239,7 +1241,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } Map scanners = - Collections.synchronizedMap(new HashMap()); + new ConcurrentHashMap(); /** * Instantiated as a scanner lease. @@ -1275,26 +1277,157 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { /** {@inheritDoc} */ public void deleteAll(final byte [] regionName, final byte [] row, - final byte [] column, final long timestamp) + final byte [] column, final long timestamp, final long lockId) throws IOException { HRegion region = getRegion(regionName); - region.deleteAll(row, column, timestamp); + region.deleteAll(row, column, timestamp, getLockFromId(lockId)); } /** {@inheritDoc} */ public void deleteAll(final byte [] regionName, final byte [] row, - final long timestamp) + final long timestamp, final long lockId) throws IOException { HRegion region = getRegion(regionName); - region.deleteAll(row, timestamp); + region.deleteAll(row, timestamp, getLockFromId(lockId)); } /** {@inheritDoc} */ public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) throws IOException{ - getRegion(regionName).deleteFamily(row, family, timestamp); + long timestamp, final long lockId) + throws IOException{ + getRegion(regionName).deleteFamily(row, family, timestamp, + getLockFromId(lockId)); } + /** {@inheritDoc} */ + public long lockRow(byte [] regionName, byte [] row) + throws IOException { + checkOpen(); + NullPointerException npe = null; + if(regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if(row == null) { + npe = new NullPointerException("row to lock is null"); + } + if(npe != null) { + IOException io = new IOException("Invalid arguments to lockRow"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + Integer r = region.obtainRowLock(row); + long lockId = addRowLock(r,region); + LOG.debug("Row lock " + lockId + " explicitly acquired by client"); + return lockId; + } catch (IOException e) { + LOG.error("Error obtaining row lock (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + + protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException { + long lockId = -1L; + lockId = rand.nextLong(); + String lockName = String.valueOf(lockId); + synchronized(rowlocks) { + rowlocks.put(lockName, r); + } + this.leases. + createLease(lockName, new RowLockListener(lockName, region)); + return lockId; + } + + /** + * Method to get the Integer lock identifier used internally + * from the long lock identifier used by the client. + * @param lockId long row lock identifier from client + * @return intId Integer row lock used internally in HRegion + * @throws IOException Thrown if this is not a valid client lock id. + */ + private Integer getLockFromId(long lockId) + throws IOException { + if(lockId == -1L) { + return null; + } + String lockName = String.valueOf(lockId); + Integer rl = null; + synchronized(rowlocks) { + rl = rowlocks.get(lockName); + } + if(rl == null) { + throw new IOException("Invalid row lock"); + } + this.leases.renewLease(lockName); + return rl; + } + + /** {@inheritDoc} */ + public void unlockRow(byte [] regionName, long lockId) + throws IOException { + checkOpen(); + NullPointerException npe = null; + if(regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if(lockId == -1L) { + npe = new NullPointerException("lockId is null"); + } + if(npe != null) { + IOException io = new IOException("Invalid arguments to unlockRow"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + String lockName = String.valueOf(lockId); + Integer r = null; + synchronized(rowlocks) { + r = rowlocks.remove(lockName); + } + if(r == null) { + throw new UnknownRowLockException(lockName); + } + region.releaseRowLock(r); + this.leases.cancelLease(lockName); + LOG.debug("Row lock " + lockId + " has been explicitly released by client"); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + Map rowlocks = + new ConcurrentHashMap(); + + /** + * Instantiated as a row lock lease. + * If the lease times out, the row lock is released + */ + private class RowLockListener implements LeaseListener { + private final String lockName; + private final HRegion region; + + RowLockListener(final String lockName, final HRegion region) { + this.lockName = lockName; + this.region = region; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Row Lock " + this.lockName + " lease expired"); + Integer r = null; + synchronized(rowlocks) { + r = rowlocks.remove(this.lockName); + } + if(r != null) { + region.releaseRowLock(r); + } + } + } /** * @return Info on this server. diff --git a/src/java/org/apache/hadoop/hbase/util/Merge.java b/src/java/org/apache/hadoop/hbase/util/Merge.java index 541fae3ab09..6598770af10 100644 --- a/src/java/org/apache/hadoop/hbase/util/Merge.java +++ b/src/java/org/apache/hadoop/hbase/util/Merge.java @@ -308,7 +308,7 @@ public class Merge extends Configured implements Tool { if (LOG.isDebugEnabled()) { LOG.debug("Removing region: " + regioninfo + " from " + meta); } - meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis()); + meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis(), null); } /* diff --git a/src/java/org/apache/hadoop/hbase/util/MetaUtils.java b/src/java/org/apache/hadoop/hbase/util/MetaUtils.java index c6f8458c465..b84454447d4 100644 --- a/src/java/org/apache/hadoop/hbase/util/MetaUtils.java +++ b/src/java/org/apache/hadoop/hbase/util/MetaUtils.java @@ -407,7 +407,7 @@ public class MetaUtils { } BatchUpdate b = new BatchUpdate(hri.getRegionName()); b.put(HConstants.COL_REGIONINFO, Writables.getBytes(hri)); - r.batchUpdate(b); + r.batchUpdate(b, null); if (LOG.isDebugEnabled()) { HRegionInfo h = Writables.getHRegionInfoOrNull( r.get(hri.getRegionName(), HConstants.COL_REGIONINFO).getValue()); diff --git a/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java b/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java index 7efbd7097a2..d117fca0470 100644 --- a/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java @@ -2050,7 +2050,8 @@ public class HRegion implements HConstants { public static void removeRegionFromMETA(final HRegionInterface srvr, final byte [] metaRegionName, final byte [] regionName) throws IOException { - srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); + srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP, + -1L); } /** @@ -2071,7 +2072,7 @@ public class HRegion implements HConstants { b.delete(COL_STARTCODE); // If carrying splits, they'll be in place when we show up on new // server. - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, -1L); } /**