HADOOP-1710 All updates should be batch updates

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@565993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-08-15 01:12:29 +00:00
parent 0c7ac6795f
commit 391379a6fb
11 changed files with 292 additions and 463 deletions

View File

@ -412,7 +412,7 @@ public class HClient implements HConstants {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
return this.table.get().startBatchUpdate(row);
return this.table.get().startUpdate(row);
}
/**
@ -423,7 +423,7 @@ public class HClient implements HConstants {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
this.table.get().abortBatch(lockid);
this.table.get().abort(lockid);
}
/**
@ -448,7 +448,7 @@ public class HClient implements HConstants {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
this.table.get().commitBatch(lockid, timestamp);
this.table.get().commit(lockid, timestamp);
}
/**
@ -464,9 +464,8 @@ public class HClient implements HConstants {
*
* @param row Name of row to start update against.
* @return Row lockid.
* @throws IOException
*/
public long startUpdate(final Text row) throws IOException {
public long startUpdate(final Text row) {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
@ -480,9 +479,8 @@ public class HClient implements HConstants {
* @param lockid lock id returned from startUpdate
* @param column column whose value is being set
* @param val new value for column
* @throws IOException
*/
public void put(long lockid, Text column, byte val[]) throws IOException {
public void put(long lockid, Text column, byte val[]) {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
@ -494,9 +492,8 @@ public class HClient implements HConstants {
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
* @throws IOException
*/
public void delete(long lockid, Text column) throws IOException {
public void delete(long lockid, Text column) {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
@ -507,9 +504,8 @@ public class HClient implements HConstants {
* Abort a row mutation
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void abort(long lockid) throws IOException {
public void abort(long lockid) {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
@ -544,13 +540,11 @@ public class HClient implements HConstants {
* Renew lease on update
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void renewLease(long lockid) throws IOException {
public void renewLease(@SuppressWarnings("unused") long lockid) {
if(this.table.get() == null) {
throw new IllegalStateException("Must open table first");
}
this.table.get().renewLease(lockid);
}
private void printUsage() {

View File

@ -502,7 +502,7 @@ public class HConnectionManager implements HConstants {
} catch (IOException e) {
if (tries < numRetries - 1) {
findServersForTable(META_TABLE_NAME);
metaServers = findServersForTable(META_TABLE_NAME);
success = false;
break;
}

View File

@ -1796,21 +1796,20 @@ HMasterRegionInterface, Runnable {
// Remove server from root/meta entries
long clientId = rand.nextLong();
for (ToDoEntry e: toDoList) {
long lockid = server.startUpdate(regionName, clientId, e.row);
BatchUpdate b = new BatchUpdate();
long lockid = b.startUpdate(e.row);
if (e.deleteRegion) {
server.delete(regionName, clientId, lockid, COL_REGIONINFO);
b.delete(lockid, COL_REGIONINFO);
} else if (e.regionOffline) {
e.info.offLine = true;
server.put(regionName, clientId, lockid, COL_REGIONINFO,
Writables.getBytes(e.info));
b.put(lockid, COL_REGIONINFO, Writables.getBytes(e.info));
}
server.delete(regionName, clientId, lockid, COL_SERVER);
server.delete(regionName, clientId, lockid, COL_STARTCODE);
server.commit(regionName, clientId, lockid, System.currentTimeMillis());
b.delete(lockid, COL_SERVER);
b.delete(lockid, COL_STARTCODE);
server.batchUpdate(regionName, System.currentTimeMillis(), b);
}
// Get regions reassigned
@ -2053,23 +2052,20 @@ HMasterRegionInterface, Runnable {
server = connection.getHRegionConnection(r.server);
}
long clientId = rand.nextLong();
try {
long lockid = server.startUpdate(metaRegionName, clientId,
regionInfo.regionName);
BatchUpdate b = new BatchUpdate();
long lockid = b.startUpdate(regionInfo.regionName);
if (deleteRegion) {
server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO);
b.delete(lockid, COL_REGIONINFO);
} else if (!reassignRegion ) {
regionInfo.offLine = true;
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
Writables.getBytes(regionInfo));
b.put(lockid, COL_REGIONINFO, Writables.getBytes(regionInfo));
}
server.delete(metaRegionName, clientId, lockid, COL_SERVER);
server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
b.delete(lockid, COL_SERVER);
b.delete(lockid, COL_STARTCODE);
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
break;
@ -2199,18 +2195,15 @@ HMasterRegionInterface, Runnable {
LOG.info("updating row " + region.getRegionName() + " in table " +
metaRegionName);
long clientId = rand.nextLong();
try {
long lockid = server.startUpdate(metaRegionName, clientId,
region.getRegionName());
BatchUpdate b = new BatchUpdate();
long lockid = b.startUpdate(region.getRegionName());
server.put(metaRegionName, clientId, lockid, COL_SERVER,
b.put(lockid, COL_SERVER,
Writables.stringToBytes(serverAddress.toString()));
server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
b.put(lockid, COL_STARTCODE, startCode);
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
// It's a meta region.
@ -2335,11 +2328,11 @@ HMasterRegionInterface, Runnable {
newRegion.getTableDesc().getName()).lastKey()));
Text metaRegionName = m.regionName;
HRegionInterface r = connection.getHRegionConnection(m.server);
long scannerid = r.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
HRegionInterface server = connection.getHRegionConnection(m.server);
long scannerid = server.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
tableName, System.currentTimeMillis(), null);
try {
KeyedData[] data = r.next(scannerid);
KeyedData[] data = server.next(scannerid);
// Test data and that the row for the data is for our table. If table
// does not exist, scanner will return row after where our table would
@ -2355,7 +2348,7 @@ HMasterRegionInterface, Runnable {
}
} finally {
r.close(scannerid);
server.close(scannerid);
}
// 2. Create the HRegion
@ -2367,13 +2360,10 @@ HMasterRegionInterface, Runnable {
HRegionInfo info = region.getRegionInfo();
Text regionName = region.getRegionName();
long clientId = rand.nextLong();
long lockid = r.startUpdate(metaRegionName, clientId, regionName);
r.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
Writables.getBytes(info));
r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis());
BatchUpdate b = new BatchUpdate();
long lockid = b.startUpdate(regionName);
b.put(lockid, COL_REGIONINFO, Writables.getBytes(info));
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
// 4. Close the new region to flush it to disk. Close its log file too.
@ -2608,7 +2598,6 @@ HMasterRegionInterface, Runnable {
new HashMap<String, HashSet<HRegionInfo>>();
protected long lockid;
protected long clientId;
ChangeTableState(Text tableName, boolean onLine) throws IOException {
super(tableName);
@ -2653,41 +2642,36 @@ HMasterRegionInterface, Runnable {
LOG.debug("updating columns in row: " + i.regionName);
}
lockid = -1L;
clientId = rand.nextLong();
try {
lockid = server.startUpdate(m.regionName, clientId, i.regionName);
updateRegionInfo(server, m.regionName, i);
server.delete(m.regionName, clientId, lockid, COL_SERVER);
server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
server.commit(m.regionName, clientId, lockid,
System.currentTimeMillis());
BatchUpdate b = new BatchUpdate();
lockid = b.startUpdate(i.regionName);
updateRegionInfo(b, i);
b.delete(lockid, COL_SERVER);
b.delete(lockid, COL_STARTCODE);
lockid = -1L;
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
LOG.error("column update failed in row: " + i.regionName, e);
} finally {
for (int tries = 0; tries < numRetries; tries++) {
try {
if (lockid != -1L) {
server.abort(m.regionName, clientId, lockid);
server.batchUpdate(m.regionName, System.currentTimeMillis(), b);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
break;
} catch (IOException iex) {
if (iex instanceof RemoteException) {
iex = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) iex);
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
LOG.error("column update failed in row: " + i.regionName, e);
break;
}
LOG.error("", iex);
}
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
// continue
}
}
@ -2738,12 +2722,11 @@ HMasterRegionInterface, Runnable {
servedRegions.clear();
}
protected void updateRegionInfo(final HRegionInterface server,
final Text regionName, final HRegionInfo i) throws IOException {
protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i)
throws IOException {
i.offLine = !online;
server.put(regionName, clientId, lockid, COL_REGIONINFO,
Writables.getBytes(i));
b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
}
}
@ -2790,11 +2773,10 @@ HMasterRegionInterface, Runnable {
}
@Override
protected void updateRegionInfo(
@SuppressWarnings("hiding") HRegionInterface server, Text regionName,
@SuppressWarnings("unused") HRegionInfo i) throws IOException {
protected void updateRegionInfo(BatchUpdate b,
@SuppressWarnings("unused") HRegionInfo i) {
server.delete(regionName, clientId, lockid, COL_REGIONINFO);
b.delete(lockid, COL_REGIONINFO);
}
}
@ -2816,39 +2798,34 @@ HMasterRegionInterface, Runnable {
protected void updateRegionInfo(HRegionInterface server, Text regionName,
HRegionInfo i) throws IOException {
long lockid = -1L;
long clientId = rand.nextLong();
try {
lockid = server.startUpdate(regionName, clientId, i.regionName);
server.put(regionName, clientId, lockid, COL_REGIONINFO,
Writables.getBytes(i));
server.commit(regionName, clientId, lockid, System.currentTimeMillis());
lockid = -1L;
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
} catch (Exception e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
LOG.error("column update failed in row: " + i.regionName, e);
} finally {
if (lockid != -1L) {
try {
server.abort(regionName, clientId, lockid);
} catch (IOException iex) {
if (iex instanceof RemoteException) {
iex = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) iex);
}
LOG.error("", iex);
BatchUpdate b = new BatchUpdate();
long lockid = b.startUpdate(i.regionName);
b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
for (int tries = 0; tries < numRetries; tries++) {
try {
server.batchUpdate(regionName, System.currentTimeMillis(), b);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
LOG.error("column update failed in row: " + i.regionName, e);
break;
}
}
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
// continue
}
}
}

View File

@ -283,13 +283,8 @@ class HMerge implements HConstants {
LOG.debug("updated columns in row: " + regionsToDelete[r]);
}
} finally {
try {
if(lockid != -1L) {
table.abort(lockid);
}
} catch(IOException iex) {
LOG.error(iex);
if(lockid != -1L) {
table.abort(lockid);
}
}
}
@ -309,13 +304,8 @@ class HMerge implements HConstants {
+ newRegion.getRegionName());
}
} finally {
try {
if(lockid != -1L) {
table.abort(lockid);
}
} catch(IOException iex) {
LOG.error(iex);
if(lockid != -1L) {
table.abort(lockid);
}
}
}

View File

@ -97,7 +97,7 @@ public interface HRegionInterface extends VersionedProtocol {
* @throws IOException
*/
public KeyedData[] getRow(final Text regionName, final Text row)
throws IOException;
throws IOException; //TODO
//////////////////////////////////////////////////////////////////////////////
// Start an atomic row insertion/update. No changes are committed until the
@ -126,7 +126,10 @@ public interface HRegionInterface extends VersionedProtocol {
* @param row Name of row to start update against.
* @return Row lockid.
* @throws IOException
*
* Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
*/
@Deprecated
public long startUpdate(final Text regionName, final long clientid,
final Text row)
throws IOException;
@ -140,7 +143,10 @@ public interface HRegionInterface extends VersionedProtocol {
* @param column column whose value is being set
* @param val new value for column
* @throws IOException
*
* Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
*/
@Deprecated
public void put(final Text regionName, final long clientid, final long lockid,
final Text column, final byte [] val)
throws IOException;
@ -153,7 +159,10 @@ public interface HRegionInterface extends VersionedProtocol {
* @param lockid lock id returned from startUpdate
* @param column name of column whose value is to be deleted
* @throws IOException
*
* Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
*/
@Deprecated
public void delete(final Text regionName, final long clientid,
final long lockid, final Text column)
throws IOException;
@ -165,7 +174,10 @@ public interface HRegionInterface extends VersionedProtocol {
* @param clientid a unique value to identify the client
* @param lockid lock id returned from startUpdate
* @throws IOException
*
* Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
*/
@Deprecated
public void abort(final Text regionName, final long clientid,
final long lockid)
throws IOException;
@ -178,7 +190,10 @@ public interface HRegionInterface extends VersionedProtocol {
* @param lockid lock id returned from startUpdate
* @param timestamp the time (in milliseconds to associate with this change)
* @throws IOException
*
* Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
*/
@Deprecated
public void commit(final Text regionName, final long clientid,
final long lockid, final long timestamp)
throws IOException;
@ -189,7 +204,10 @@ public interface HRegionInterface extends VersionedProtocol {
* @param lockid lock id returned from startUpdate
* @param clientid a unique value to identify the client
* @throws IOException
*
* Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
*/
@Deprecated
public void renewLease(long lockid, long clientid) throws IOException;
//////////////////////////////////////////////////////////////////////////////
@ -229,7 +247,7 @@ public interface HRegionInterface extends VersionedProtocol {
* @return array of values
* @throws IOException
*/
public KeyedData[] next(long scannerId) throws IOException;
public KeyedData[] next(long scannerId) throws IOException; //TODO
/**
* Close a scanner

View File

@ -225,7 +225,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Remove old region from META
// NOTE: there is no need for retry logic here. HTable does it for us.
long lockid = t.startBatchUpdate(oldRegionInfo.getRegionName());
long lockid = t.startUpdate(oldRegionInfo.getRegionName());
oldRegionInfo.offLine = true;
oldRegionInfo.split = true;
t.put(lockid, COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
@ -235,17 +235,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
t.put(lockid, COL_SPLITB, Writables.getBytes(
newRegions[1].getRegionInfo()));
t.commitBatch(lockid);
t.commit(lockid);
// Add new regions to META
for (int i = 0; i < newRegions.length; i++) {
lockid = t.startBatchUpdate(newRegions[i].getRegionName());
lockid = t.startUpdate(newRegions[i].getRegionName());
t.put(lockid, COL_REGIONINFO, Writables.getBytes(
newRegions[i].getRegionInfo()));
t.commitBatch(lockid);
t.commit(lockid);
}
// Now tell the master about the new regions
@ -975,18 +975,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// HRegionInterface
//////////////////////////////////////////////////////////////////////////////
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public HRegionInfo getRegionInfo(final Text regionName)
throws NotServingRegionException {
requestCount.incrementAndGet();
return getRegion(regionName).getRegionInfo();
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
throws IOException {
requestCount.incrementAndGet();
@ -1006,9 +1002,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
commit(regionName, clientid, lockid, timestamp);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public byte [] get(final Text regionName, final Text row,
final Text column)
throws IOException {
@ -1016,9 +1010,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return getRegion(regionName).get(row, column);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row,
final Text column, final int numVersions)
throws IOException {
@ -1026,18 +1018,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return getRegion(regionName).get(row, column, numVersions);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException {
requestCount.incrementAndGet();
return getRegion(regionName).get(row, column, timestamp, numVersions);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public KeyedData[] getRow(final Text regionName, final Text row)
throws IOException {
requestCount.incrementAndGet();
@ -1052,9 +1040,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return result;
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public KeyedData[] next(final long scannerId)
throws IOException {
requestCount.incrementAndGet();
@ -1096,18 +1082,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return values.toArray(new KeyedData[values.size()]);
}
/**
* {@inheritDoc}
/*
* NOTE: When startUpdate, put, delete, abort, commit and renewLease are
* removed from HRegionInterface, these methods (with the exception of
* renewLease) must remain, as they are called by batchUpdate (renewLease
* can just be removed)
*
* However, the remaining methods can become protected instead of public
* at that point.
*/
public long startUpdate(Text regionName, long clientid, Text row)
throws IOException {
requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
long lockid = region.startUpdate(row);
this.leases.createLease(clientid, lockid,
new RegionListener(region, lockid));
return lockid;
}
/** Create a lease for an update. If it times out, the update is aborted */
private static class RegionListener implements LeaseListener {
@ -1119,9 +1102,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.localLockId = lockId;
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public void leaseExpired() {
try {
localRegion.abort(localLockId);
@ -1139,9 +1120,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public long startUpdate(Text regionName, long clientid, Text row)
throws IOException {
requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
long lockid = region.startUpdate(row);
this.leases.createLease(clientid, lockid,
new RegionListener(region, lockid));
return lockid;
}
/** {@inheritDoc} */
public void put(final Text regionName, final long clientid,
final long lockid, final Text column, final byte [] val)
throws IOException {
@ -1151,9 +1141,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.put(lockid, column, val);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public void delete(Text regionName, long clientid, long lockid, Text column)
throws IOException {
requestCount.incrementAndGet();
@ -1162,9 +1150,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.delete(lockid, column);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public void abort(Text regionName, long clientid, long lockid)
throws IOException {
requestCount.incrementAndGet();
@ -1173,9 +1159,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.abort(lockid);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public void commit(Text regionName, final long clientid, final long lockid,
final long timestamp) throws IOException {
requestCount.incrementAndGet();
@ -1184,9 +1168,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.commit(lockid, timestamp);
}
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public void renewLease(long lockid, long clientid) throws IOException {
requestCount.incrementAndGet();
leases.renewLease(clientid, lockid);

View File

@ -52,11 +52,6 @@ public class HTable implements HConstants {
// For row mutation operations
protected volatile long currentLockId;
protected volatile Text currentRegion;
protected volatile HRegionInterface currentServer;
protected volatile long clientid;
protected volatile boolean closed;
protected void checkClosed() {
@ -81,7 +76,6 @@ public class HTable implements HConstants {
this.rand = new Random();
tableServers = connection.getTableServers(tableName);
this.batch = null;
this.currentLockId = -1L;
closed = false;
}
@ -116,10 +110,6 @@ public class HTable implements HConstants {
closed = true;
tableServers = null;
batch = null;
currentLockId = -1L;
currentRegion = null;
currentServer = null;
clientid = -1L;
connection.close(tableName);
}
@ -127,8 +117,28 @@ public class HTable implements HConstants {
* Verifies that no update is in progress
*/
public synchronized void checkUpdateInProgress() {
if (batch != null || currentLockId != -1L) {
throw new IllegalStateException("update in progress");
updateInProgress(false);
}
/*
* Checks to see if an update is in progress
*
* @param updateMustBeInProgress
* If true, an update must be in progress. An IllegalStateException will be
* thrown if not.
*
* If false, an update must not be in progress. An IllegalStateException
* will be thrown if an update is in progress.
*/
private void updateInProgress(boolean updateMustBeInProgress) {
if (updateMustBeInProgress) {
if (batch == null) {
throw new IllegalStateException("no update in progress");
}
} else {
if (batch != null) {
throw new IllegalStateException("update in progress");
}
}
}
@ -415,27 +425,25 @@ public class HTable implements HConstants {
*
* @param row name of row to be updated
* @return lockid to be used in subsequent put, delete and commit calls
*
* Deprecated. Batch operations are now the default. startBatchUpdate is now
* implemented by @see {@link #startUpdate(Text)}
*/
@Deprecated
public synchronized long startBatchUpdate(final Text row) {
checkClosed();
checkUpdateInProgress();
batch = new BatchUpdate();
return batch.startUpdate(row);
return startUpdate(row);
}
/**
* Abort a batch mutation
* @param lockid lock id returned by startBatchUpdate
*
* Deprecated. Batch operations are now the default. abortBatch is now
* implemented by @see {@link #abort(long)}
*/
@Deprecated
public synchronized void abortBatch(final long lockid) {
checkClosed();
if (batch == null) {
throw new IllegalStateException("no batch update in progress");
}
if (batch.getLockid() != lockid) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
batch = null;
abort(lockid);
}
/**
@ -443,9 +451,13 @@ public class HTable implements HConstants {
*
* @param lockid lock id returned by startBatchUpdate
* @throws IOException
*
* Deprecated. Batch operations are now the default. commitBatch(long) is now
* implemented by @see {@link #commit(long)}
*/
@Deprecated
public void commitBatch(final long lockid) throws IOException {
commitBatch(lockid, System.currentTimeMillis());
commit(lockid, System.currentTimeMillis());
}
/**
@ -454,14 +466,99 @@ public class HTable implements HConstants {
* @param lockid lock id returned by startBatchUpdate
* @param timestamp time to associate with all the changes
* @throws IOException
*
* Deprecated. Batch operations are now the default. commitBatch(long, long)
* is now implemented by @see {@link #commit(long, long)}
*/
@Deprecated
public synchronized void commitBatch(final long lockid, final long timestamp)
throws IOException {
commit(lockid, timestamp);
}
/**
* Start an atomic row insertion/update. No changes are committed until the
* call to commit() returns.
*
* A call to abort() will abandon any updates in progress.
*
*
* @param row Name of row to start update against.
* @return Row lockid.
*/
public synchronized long startUpdate(final Text row) {
checkClosed();
if (batch == null) {
throw new IllegalStateException("no batch update in progress");
updateInProgress(false);
batch = new BatchUpdate();
return batch.startUpdate(row);
}
/**
* Change a value for the specified column.
* Runs {@link #abort(long)} if exception thrown.
*
* @param lockid lock id returned from startUpdate
* @param column column whose value is being set
* @param val new value for column
*/
public void put(long lockid, Text column, byte val[]) {
checkClosed();
if (val == null) {
throw new IllegalArgumentException("value cannot be null");
}
updateInProgress(true);
batch.put(lockid, column, val);
}
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
*/
public void delete(long lockid, Text column) {
checkClosed();
updateInProgress(true);
batch.delete(lockid, column);
}
/**
* Abort a row mutation
*
* @param lockid - lock id returned from startUpdate
*/
public synchronized void abort(long lockid) {
checkClosed();
updateInProgress(true);
if (batch.getLockid() != lockid) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
batch = null;
}
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void commit(long lockid) throws IOException {
commit(lockid, System.currentTimeMillis());
}
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* @param timestamp - time to associate with the change
* @throws IOException
*/
public synchronized void commit(long lockid, long timestamp)
throws IOException {
checkClosed();
updateInProgress(true);
if (batch.getLockid() != lockid) {
throw new IllegalArgumentException("invalid lock id " + lockid);
}
@ -482,7 +579,8 @@ public class HTable implements HConstants {
} else {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
throw e;
}
@ -498,246 +596,16 @@ public class HTable implements HConstants {
}
}
/**
* Start an atomic row insertion/update. No changes are committed until the
* call to commit() returns. A call to abort() will abandon any updates in progress.
*
* Callers to this method are given a lease for each unique lockid; before the
* lease expires, either abort() or commit() must be called. If it is not
* called, the system will automatically call abort() on the client's behalf.
*
* The client can gain extra time with a call to renewLease().
* Start an atomic row insertion or update
*
* @param row Name of row to start update against.
* @return Row lockid.
* @throws IOException
*/
public synchronized long startUpdate(final Text row) throws IOException {
checkClosed();
checkUpdateInProgress();
for (int tries = 0; tries < numRetries; tries++) {
IOException e = null;
HRegionLocation info = getRegionLocation(row);
try {
currentServer =
connection.getHRegionConnection(info.getServerAddress());
currentRegion = info.getRegionInfo().getRegionName();
clientid = rand.nextLong();
currentLockId = currentServer.startUpdate(currentRegion, clientid, row);
break;
} catch (IOException ex) {
e = ex;
}
if (tries < numRetries - 1) {
try {
Thread.sleep(this.pause);
} catch (InterruptedException ex) {
}
try {
tableServers = connection.reloadTableServers(tableName);
} catch (IOException ex) {
e = ex;
}
} else {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
}
return currentLockId;
}
/**
* Change a value for the specified column.
* Runs {@link #abort(long)} if exception thrown.
*
* @param lockid lock id returned from startUpdate
* @param column column whose value is being set
* @param val new value for column
* @throws IOException
*/
public void put(long lockid, Text column, byte val[]) throws IOException {
checkClosed();
if (val == null) {
throw new IllegalArgumentException("value cannot be null");
}
if (batch != null) {
batch.put(lockid, column, val);
return;
}
if (lockid != currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
val);
} catch (IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch (IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
}
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
* @throws IOException
*/
public void delete(long lockid, Text column) throws IOException {
checkClosed();
if (batch != null) {
batch.delete(lockid, column);
return;
}
if (lockid != currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.delete(this.currentRegion, this.clientid, lockid,
column);
} catch (IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
}
/**
* Abort a row mutation
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public synchronized void abort(long lockid) throws IOException {
checkClosed();
if (batch != null) {
abortBatch(lockid);
return;
}
if (lockid != currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
try {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch (IOException e) {
this.currentServer = null;
this.currentRegion = null;
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
} finally {
currentLockId = -1L;
}
}
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void commit(long lockid) throws IOException {
commit(lockid, System.currentTimeMillis());
}
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* @param timestamp - time to associate with the change
* @throws IOException
*/
public synchronized void commit(long lockid, long timestamp) throws IOException {
checkClosed();
if (batch != null) {
commitBatch(lockid, timestamp);
return;
}
if (lockid != currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
try {
try {
this.currentServer.commit(this.currentRegion, this.clientid, lockid,
timestamp);
} catch (IOException e) {
this.currentServer = null;
this.currentRegion = null;
if(e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
} finally {
currentLockId = -1L;
}
}
/**
* Renew lease on update
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*
* Deprecated. Batch updates are now the default. Consequently this method
* does nothing.
*/
public synchronized void renewLease(long lockid) throws IOException {
checkClosed();
if (batch != null) {
return;
}
if (lockid != currentLockId) {
throw new IllegalArgumentException("invalid lockid");
}
try {
this.currentServer.renewLease(lockid, this.clientid);
} catch (IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch (IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
@Deprecated
public synchronized void renewLease(@SuppressWarnings("unused") long lockid) {
}
/**

View File

@ -63,7 +63,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
/** the test case */
public void testBatchUpdate() {
try {
table.commitBatch(-1L);
table.commit(-1L);
} catch (IllegalStateException e) {
// expected
@ -72,7 +72,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
fail();
}
long lockid = table.startBatchUpdate(new Text("row1"));
long lockid = table.startUpdate(new Text("row1"));
try {
try {
@ -86,9 +86,9 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
}
table.put(lockid, CONTENTS, value);
table.delete(lockid, CONTENTS);
table.commitBatch(lockid);
table.commit(lockid);
lockid = table.startBatchUpdate(new Text("row2"));
lockid = table.startUpdate(new Text("row2"));
table.put(lockid, CONTENTS, value);
table.commit(lockid);

View File

@ -59,7 +59,7 @@ public class TestHTable extends HBaseClusterTestCase implements HConstants {
byte[] value = "value".getBytes(UTF8_ENCODING);
HTable a = new HTable(conf, tableAname);
long lockid = a.startBatchUpdate(row);
long lockid = a.startUpdate(row);
a.put(lockid, COLUMN_FAMILY, value);
a.commit(lockid);
@ -77,7 +77,7 @@ public class TestHTable extends HBaseClusterTestCase implements HConstants {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
while(s.next(key, results)) {
lockid = b.startBatchUpdate(key.getRow());
lockid = b.startUpdate(key.getRow());
for(Map.Entry<Text, byte[]> e: results.entrySet()) {
b.put(lockid, e.getKey(), e.getValue());
}

View File

@ -280,7 +280,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
private void removeRegionFromMETA(final HTable t, final Text regionName)
throws IOException {
try {
long lockid = t.startBatchUpdate(regionName);
long lockid = t.startUpdate(regionName);
t.delete(lockid, HConstants.COL_REGIONINFO);
t.delete(lockid, HConstants.COL_SERVER);
t.delete(lockid, HConstants.COL_STARTCODE);

View File

@ -461,7 +461,7 @@ public class TestSplit extends HBaseTestCase {
for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
Text t = new Text(new String(bytes));
long lockid = table.startBatchUpdate(t);
long lockid = table.startUpdate(t);
try {
table.put(lockid, new Text(column), bytes);
table.commit(lockid, System.currentTimeMillis());