diff --git a/CHANGES.txt b/CHANGES.txt index bbae1a52714..dbaf2113650 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -77,3 +77,5 @@ Trunk (unreleased changes) region server 49. HADOOP-1646 RegionServer OOME's under sustained, substantial loading by 10 concurrent clients + 50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches + to a single row at a time) diff --git a/src/java/org/apache/hadoop/hbase/HClient.java b/src/java/org/apache/hadoop/hbase/HClient.java index 138cd169e83..9798c870c8d 100644 --- a/src/java/org/apache/hadoop/hbase/HClient.java +++ b/src/java/org/apache/hadoop/hbase/HClient.java @@ -22,13 +22,15 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,99 +63,9 @@ public class HClient implements HConstants { int numRetries; HMasterInterface master; private final Configuration conf; - private volatile long currentLockId; + private AtomicLong currentLockId; private Class serverInterfaceClass; - - protected class BatchHandler { - private HashMap regionToBatch; - private HashMap lockToBatch; - - /** constructor */ - public BatchHandler() { - this.regionToBatch = new HashMap(); - this.lockToBatch = new HashMap(); - } - - /** - * Start a batch row insertion/update. - * - * Manages multiple batch updates that are targeted for multiple servers, - * should the rows span several region servers. - * - * No changes are committed until the client commits the batch operation via - * HClient.batchCommit(). - * - * The entire batch update can be abandoned by calling HClient.batchAbort(); - * - * Callers to this method are given a handle that corresponds to the row being - * changed. The handle must be supplied on subsequent put or delete calls so - * that the row can be identified. - * - * @param row Name of row to start update against. - * @return Row lockid. - */ - public synchronized long startUpdate(Text row) { - RegionLocation info = getRegionLocation(row); - BatchUpdate batch = regionToBatch.get(info); - if(batch == null) { - batch = new BatchUpdate(); - regionToBatch.put(info, batch); - } - long lockid = batch.startUpdate(row); - lockToBatch.put(lockid, batch); - return lockid; - } - - /** - * Change the value for the specified column - * - * @param lockid lock id returned from startUpdate - * @param column column whose value is being set - * @param value new value for column - */ - public synchronized void put(long lockid, Text column, byte[] value) { - BatchUpdate batch = lockToBatch.get(lockid); - if (batch == null) { - throw new IllegalArgumentException("invalid lock id " + lockid); - } - batch.put(lockid, column, value); - } - - /** - * Delete the value for a column - * - * @param lockid - lock id returned from startUpdate - * @param column - name of column whose value is to be deleted - */ - public synchronized void delete(long lockid, Text column) { - BatchUpdate batch = lockToBatch.get(lockid); - if (batch == null) { - throw new IllegalArgumentException("invalid lock id " + lockid); - } - batch.delete(lockid, column); - } - - /** - * Finalize a batch mutation - * - * @param timestamp time to associate with all the changes - * @throws IOException - */ - public synchronized void commit(long timestamp) throws IOException { - try { - for(Map.Entry e: regionToBatch.entrySet()) { - RegionLocation r = e.getKey(); - HRegionInterface server = getHRegionConnection(r.serverAddress); - server.batchUpdate(r.regionInfo.getRegionName(), timestamp, - e.getValue()); - } - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - } - } - - private BatchHandler batch; + private AtomicReference batch; /* * Data structure that holds current location for a region and its info. @@ -606,8 +518,8 @@ public class HClient implements HConstants { */ public HClient(Configuration conf) { this.conf = conf; - this.batch = null; - this.currentLockId = -1; + this.batch = new AtomicReference(); + this.currentLockId = new AtomicLong(-1L); this.pause = conf.getLong("hbase.client.pause", 30 * 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 5); @@ -1159,7 +1071,7 @@ public class HClient implements HConstants { if(tableName == null || tableName.getLength() == 0) { throw new IllegalArgumentException("table name cannot be null or zero length"); } - if(this.currentLockId != -1 || batch != null) { + if(this.currentLockId.get() != -1L || batch.get() != null) { throw new IllegalStateException("update in progress"); } this.currentTableServers = tableServers.getTableServers(tableName); @@ -1481,51 +1393,90 @@ public class HClient implements HConstants { * * No changes are committed until the call to commitBatchUpdate returns. * A call to abortBatchUpdate will abandon the entire batch. - * - * Note that in batch mode, calls to commit or abort are ignored. + * + * @param row name of row to be updated + * @return lockid to be used in subsequent put, delete and commit calls */ - public synchronized void startBatchUpdate() { - if(this.currentTableServers == null) { + public synchronized long startBatchUpdate(final Text row) { + if (this.currentTableServers == null) { throw new IllegalStateException("Must open table first"); } - - if(batch == null) { - batch = new BatchHandler(); + if (batch.get() != null) { + throw new IllegalStateException("batch update in progress"); } + batch.set(new BatchUpdate()); + return batch.get().startUpdate(row); } /** * Abort a batch mutation + * @param lockid lock id returned by startBatchUpdate */ - public synchronized void abortBatch() { - batch = null; + public synchronized void abortBatch(final long lockid) { + BatchUpdate u = batch.get(); + if (u == null) { + throw new IllegalStateException("no batch update in progress"); + } + if (u.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } + batch.set(null); } /** * Finalize a batch mutation * + * @param lockid lock id returned by startBatchUpdate * @throws IOException */ - public synchronized void commitBatch() throws IOException { - commitBatch(System.currentTimeMillis()); + public void commitBatch(final long lockid) throws IOException { + commitBatch(lockid, System.currentTimeMillis()); } /** * Finalize a batch mutation * + * @param lockid lock id returned by startBatchUpdate * @param timestamp time to associate with all the changes * @throws IOException */ - public synchronized void commitBatch(long timestamp) throws IOException { - if(batch == null) { + public synchronized void commitBatch(final long lockid, final long timestamp) + throws IOException { + BatchUpdate u = batch.get(); + if (u == null) { throw new IllegalStateException("no batch update in progress"); } + if (u.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } try { - batch.commit(timestamp); - + for (int tries = 0; tries < numRetries; tries++) { + RegionLocation r = getRegionLocation(u.getRow()); + HRegionInterface server = getHRegionConnection(r.serverAddress); + try { + server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, u); + break; + + } catch (IOException e) { + if (tries < numRetries -1) { + reloadCurrentTable(r); + + } else { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + } + try { + Thread.sleep(pause); + + } catch (InterruptedException e) { + } + } } finally { - batch = null; + batch.set(null); } } @@ -1545,26 +1496,27 @@ public class HClient implements HConstants { * @throws IOException */ public synchronized long startUpdate(final Text row) throws IOException { - if(this.currentLockId != -1) { + if (this.currentLockId.get() != -1L) { throw new IllegalStateException("update in progress"); } - if(batch != null) { - return batch.startUpdate(row); + if (batch.get() != null) { + throw new IllegalStateException("batch update in progress"); } - for(int tries = 0; tries < numRetries; tries++) { + for (int tries = 0; tries < numRetries; tries++) { IOException e = null; RegionLocation info = getRegionLocation(row); try { currentServer = getHRegionConnection(info.serverAddress); currentRegion = info.regionInfo.regionName; clientid = rand.nextLong(); - this.currentLockId = currentServer.startUpdate(currentRegion, clientid, row); + this.currentLockId.set( + currentServer.startUpdate(currentRegion, clientid, row)); break; } catch (IOException ex) { e = ex; } - if(tries < numRetries - 1) { + if (tries < numRetries - 1) { try { Thread.sleep(this.pause); @@ -1577,13 +1529,13 @@ public class HClient implements HConstants { e = ex; } } else { - if(e instanceof RemoteException) { + if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } throw e; } } - return this.currentLockId; + return this.currentLockId.get(); } /** @@ -1596,29 +1548,29 @@ public class HClient implements HConstants { * @throws IOException */ public void put(long lockid, Text column, byte val[]) throws IOException { - if(val == null) { + if (val == null) { throw new IllegalArgumentException("value cannot be null"); } - if(batch != null) { - batch.put(lockid, column, val); + if (batch.get() != null) { + batch.get().put(lockid, column, val); return; } - if(lockid != this.currentLockId) { + if (lockid != this.currentLockId.get()) { throw new IllegalArgumentException("invalid lockid"); } try { this.currentServer.put(this.currentRegion, this.clientid, lockid, column, val); - } catch(IOException e) { + } catch (IOException e) { try { this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e2) { + } catch (IOException e2) { LOG.warn(e2); } this.currentServer = null; this.currentRegion = null; - if(e instanceof RemoteException) { + if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } throw e; @@ -1633,18 +1585,18 @@ public class HClient implements HConstants { * @throws IOException */ public void delete(long lockid, Text column) throws IOException { - if(batch != null) { - batch.delete(lockid, column); + if (batch.get() != null) { + batch.get().delete(lockid, column); return; } - if(lockid != this.currentLockId) { + if (lockid != this.currentLockId.get()) { throw new IllegalArgumentException("invalid lockid"); } try { this.currentServer.delete(this.currentRegion, this.clientid, lockid, column); - } catch(IOException e) { + } catch (IOException e) { try { this.currentServer.abort(this.currentRegion, this.clientid, lockid); } catch(IOException e2) { @@ -1652,7 +1604,7 @@ public class HClient implements HConstants { } this.currentServer = null; this.currentRegion = null; - if(e instanceof RemoteException) { + if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } throw e; @@ -1666,24 +1618,25 @@ public class HClient implements HConstants { * @throws IOException */ public void abort(long lockid) throws IOException { - if(batch != null) { + if (batch.get() != null) { + abortBatch(lockid); return; } - if(lockid != this.currentLockId) { + if (lockid != this.currentLockId.get()) { throw new IllegalArgumentException("invalid lockid"); } try { this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e) { + } catch (IOException e) { this.currentServer = null; this.currentRegion = null; - if(e instanceof RemoteException) { + if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } throw e; } finally { - this.currentLockId = -1; + this.currentLockId.set(-1L); } } @@ -1705,11 +1658,12 @@ public class HClient implements HConstants { * @throws IOException */ public void commit(long lockid, long timestamp) throws IOException { - if(batch != null) { + if (batch.get() != null) { + commitBatch(lockid, timestamp); return; } - if(lockid != this.currentLockId) { + if (lockid != this.currentLockId.get()) { throw new IllegalArgumentException("invalid lockid"); } try { @@ -1724,7 +1678,7 @@ public class HClient implements HConstants { } throw e; } finally { - this.currentLockId = -1; + this.currentLockId.set(-1L); } } @@ -1735,24 +1689,24 @@ public class HClient implements HConstants { * @throws IOException */ public void renewLease(long lockid) throws IOException { - if(batch != null) { + if (batch.get() != null) { return; } - if(lockid != this.currentLockId) { + if (lockid != this.currentLockId.get()) { throw new IllegalArgumentException("invalid lockid"); } try { this.currentServer.renewLease(lockid, this.clientid); - } catch(IOException e) { + } catch (IOException e) { try { this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e2) { + } catch (IOException e2) { LOG.warn(e2); } this.currentServer = null; this.currentRegion = null; - if(e instanceof RemoteException) { + if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } throw e; @@ -1770,7 +1724,7 @@ public class HClient implements HConstants { private Text startRow; private long scanTime; private boolean closed; - private volatile RegionLocation[] regions; + private AtomicReferenceArray regions; @SuppressWarnings("hiding") private int currentRegion; private HRegionInterface server; @@ -1791,7 +1745,8 @@ public class HClient implements HConstants { Collection info = currentTableServers.tailMap(firstServer).values(); - this.regions = info.toArray(new RegionLocation[info.size()]); + this.regions = new AtomicReferenceArray( + info.toArray(new RegionLocation[info.size()])); } ClientScanner(Text[] columns, Text startRow, long timestamp, @@ -1821,14 +1776,14 @@ public class HClient implements HConstants { this.scannerId = -1L; } this.currentRegion += 1; - if(this.currentRegion == this.regions.length) { + if(this.currentRegion == this.regions.length()) { close(); return false; } try { for(int tries = 0; tries < numRetries; tries++) { - RegionLocation info = this.regions[currentRegion]; - this.server = getHRegionConnection(this.regions[currentRegion].serverAddress); + RegionLocation info = this.regions.get(currentRegion); + this.server = getHRegionConnection(info.serverAddress); try { if (this.filter == null) { diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index b8a5d6f3e65..622f64ceeb3 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -976,23 +976,20 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException { - for(Map.Entry> e: b) { - Text row = e.getKey(); - long clientid = rand.nextLong(); - long lockid = startUpdate(regionName, clientid, row); - for(BatchOperation op: e.getValue()) { - switch(op.getOp()) { - case BatchOperation.PUT_OP: - put(regionName, clientid, lockid, op.getColumn(), op.getValue()); - break; - - case BatchOperation.DELETE_OP: - delete(regionName, clientid, lockid, op.getColumn()); - break; - } + long clientid = rand.nextLong(); + long lockid = startUpdate(regionName, clientid, b.getRow()); + for(BatchOperation op: b) { + switch(op.getOp()) { + case BatchOperation.PUT_OP: + put(regionName, clientid, lockid, op.getColumn(), op.getValue()); + break; + + case BatchOperation.DELETE_OP: + delete(regionName, clientid, lockid, op.getColumn()); + break; } - commit(regionName, clientid, lockid, timestamp); } + commit(regionName, clientid, lockid, timestamp); } /** diff --git a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java index 50754d5cf1f..469a1de7ecd 100644 --- a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java +++ b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java @@ -23,9 +23,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Random; import org.apache.hadoop.io.Text; @@ -38,25 +36,38 @@ import org.apache.hadoop.io.Writable; * can result in multiple BatchUpdate objects if the batch contains rows that * are served by multiple region servers. */ -public class BatchUpdate implements Writable, -Iterable>> { +public class BatchUpdate implements Writable, Iterable { // used to generate lock ids private Random rand; + + // the row being updated + private Text row; - // used on client side to map lockid to a set of row updates - private HashMap> lockToRowOps; + // the lockid + private long lockid; - // the operations for each row - private HashMap> operations; + // the batched operations + private ArrayList operations; /** constructor */ public BatchUpdate() { this.rand = new Random(); - this.lockToRowOps = new HashMap>(); - this.operations = new HashMap>(); + this.row = new Text(); + this.lockid = -1L; + this.operations = new ArrayList(); } + /** @return the lock id */ + public long getLockid() { + return lockid; + } + + /** @return the row */ + public Text getRow() { + return row; + } + /** * Start a batch row insertion/update. * @@ -66,21 +77,15 @@ Iterable>> { * The entire batch update can be abandoned by calling HClient.batchAbort(); * * Callers to this method are given a handle that corresponds to the row being - * changed. The handle must be supplied on subsequent put or delete calls so - * that the row can be identified. + * changed. The handle must be supplied on subsequent put or delete calls. * * @param row Name of row to start update against. * @return Row lockid. */ - public synchronized long startUpdate(Text row) { - Long lockid = Long.valueOf(Math.abs(rand.nextLong())); - ArrayList ops = operations.get(row); - if(ops == null) { - ops = new ArrayList(); - operations.put(row, ops); - } - lockToRowOps.put(lockid, ops); - return lockid.longValue(); + public synchronized long startUpdate(final Text row) { + this.row = row; + this.lockid = Long.valueOf(Math.abs(rand.nextLong())); + return this.lockid; } /** @@ -90,12 +95,12 @@ Iterable>> { * @param column - column whose value is being set * @param val - new value for column */ - public synchronized void put(long lockid, Text column, byte val[]) { - ArrayList ops = lockToRowOps.get(lockid); - if(ops == null) { - throw new IllegalArgumentException("no row for lockid " + lockid); + public synchronized void put(final long lockid, final Text column, + final byte val[]) { + if(this.lockid != lockid) { + throw new IllegalArgumentException("invalid lockid " + lockid); } - ops.add(new BatchOperation(column, val)); + operations.add(new BatchOperation(column, val)); } /** @@ -104,12 +109,11 @@ Iterable>> { * @param lockid - lock id returned from startUpdate * @param column - name of column whose value is to be deleted */ - public synchronized void delete(long lockid, Text column) { - ArrayList ops = lockToRowOps.get(lockid); - if(ops == null) { - throw new IllegalArgumentException("no row for lockid " + lockid); + public synchronized void delete(final long lockid, final Text column) { + if(this.lockid != lockid) { + throw new IllegalArgumentException("invalid lockid " + lockid); } - ops.add(new BatchOperation(column)); + operations.add(new BatchOperation(column)); } // @@ -117,11 +121,10 @@ Iterable>> { // /** - * @return Iterator>> - * Text row -> ArrayList changes + * @return Iterator */ - public Iterator>> iterator() { - return operations.entrySet().iterator(); + public Iterator iterator() { + return operations.iterator(); } // @@ -132,20 +135,12 @@ Iterable>> { * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { + row.readFields(in); int nOps = in.readInt(); for (int i = 0; i < nOps; i++) { - Text row = new Text(); - row.readFields(in); - - int nRowOps = in.readInt(); - ArrayList rowOps = new ArrayList(); - for(int j = 0; j < nRowOps; j++) { - BatchOperation op = new BatchOperation(); - op.readFields(in); - rowOps.add(op); - } - - operations.put(row, rowOps); + BatchOperation op = new BatchOperation(); + op.readFields(in); + operations.add(op); } } @@ -153,16 +148,10 @@ Iterable>> { * {@inheritDoc} */ public void write(DataOutput out) throws IOException { + row.write(out); out.writeInt(operations.size()); - for (Map.Entry> e: operations.entrySet()) { - e.getKey().write(out); - - ArrayList ops = e.getValue(); - out.writeInt(ops.size()); - - for(BatchOperation op: ops) { - op.write(out); - } + for (BatchOperation op: operations) { + op.write(out); } } } diff --git a/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java b/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java index 2aaf2301818..bbc903912ce 100644 --- a/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java +++ b/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase; +import java.io.UnsupportedEncodingException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.io.Text; @@ -29,11 +30,21 @@ import org.apache.hadoop.io.Text; public class TestBatchUpdate extends HBaseClusterTestCase { private static final String CONTENTS_STR = "contents:"; private static final Text CONTENTS = new Text(CONTENTS_STR); - private static final byte[] value = { 1, 2, 3, 4 }; + private byte[] value; private HTableDescriptor desc = null; private HClient client = null; + /** constructor */ + public TestBatchUpdate() { + try { + value = "abcd".getBytes(HConstants.UTF8_ENCODING); + + } catch (UnsupportedEncodingException e) { + fail(); + } + } + /** * {@inheritDoc} */ @@ -56,7 +67,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase { /** the test case */ public void testBatchUpdate() { try { - client.commitBatch(); + client.commitBatch(-1L); } catch (IllegalStateException e) { // expected @@ -65,7 +76,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase { fail(); } - client.startBatchUpdate(); + long lockid = client.startBatchUpdate(new Text("row1")); try { client.openTable(HConstants.META_TABLE_NAME); @@ -77,14 +88,22 @@ public class TestBatchUpdate extends HBaseClusterTestCase { fail(); } try { - long lockid = client.startUpdate(new Text("row1")); + try { + @SuppressWarnings("unused") + long dummy = client.startUpdate(new Text("row2")); + } catch (IllegalStateException e) { + // expected + } catch (Exception e) { + e.printStackTrace(); + fail(); + } client.put(lockid, CONTENTS, value); client.delete(lockid, CONTENTS); + client.commitBatch(lockid); - lockid = client.startUpdate(new Text("row2")); + lockid = client.startBatchUpdate(new Text("row2")); client.put(lockid, CONTENTS, value); - - client.commitBatch(); + client.commit(lockid); Text[] columns = { CONTENTS }; HScannerInterface scanner = client.obtainScanner(columns, new Text());