From cbb844d5f0be591c76e84e325b40fc313851d57c Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Mon, 16 Jul 2007 22:19:59 +0000 Subject: [PATCH] HADOOP-1468 Add HBase batch update to reduce RPC overhead git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@556754 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + src/java/org/apache/hadoop/hbase/HClient.java | 227 +++++++++++++++++- .../org/apache/hadoop/hbase/HRegionInfo.java | 138 ++++++----- .../apache/hadoop/hbase/HRegionInterface.java | 11 + .../apache/hadoop/hbase/HRegionServer.java | 25 ++ .../apache/hadoop/hbase/HServerAddress.java | 85 ++++++- .../hadoop/hbase/io/BatchOperation.java | 121 ++++++++++ .../apache/hadoop/hbase/io/BatchUpdate.java | 168 +++++++++++++ .../apache/hadoop/hbase/TestBatchUpdate.java | 104 ++++++++ 9 files changed, 795 insertions(+), 86 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/io/BatchOperation.java create mode 100644 src/java/org/apache/hadoop/hbase/io/BatchUpdate.java create mode 100644 src/test/org/apache/hadoop/hbase/TestBatchUpdate.java diff --git a/CHANGES.txt b/CHANGES.txt index a483d1695a0..80b4bf71e16 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -63,3 +63,5 @@ Trunk (unreleased changes) 39. HADOOP-1581 Un-openable tablename bug 40. HADOOP-1607 [shell] Clear screen command (Edward Yoon via Stack) 41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates + 42. HADOOP-1468 Add HBase batch update to reduce RPC overhead + diff --git a/src/java/org/apache/hadoop/hbase/HClient.java b/src/java/org/apache/hadoop/hbase/HClient.java index f233b4a9e9e..38c61e45a45 100644 --- a/src/java/org/apache/hadoop/hbase/HClient.java +++ b/src/java/org/apache/hadoop/hbase/HClient.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; @@ -33,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; @@ -62,10 +64,102 @@ public class HClient implements HConstants { private long currentLockId; private Class serverInterfaceClass; + protected class BatchHandler { + private HashMap regionToBatch; + private HashMap lockToBatch; + + /** constructor */ + public BatchHandler() { + this.regionToBatch = new HashMap(); + this.lockToBatch = new HashMap(); + } + + /** + * Start a batch row insertion/update. + * + * Manages multiple batch updates that are targeted for multiple servers, + * should the rows span several region servers. + * + * No changes are committed until the client commits the batch operation via + * HClient.batchCommit(). + * + * The entire batch update can be abandoned by calling HClient.batchAbort(); + * + * Callers to this method are given a handle that corresponds to the row being + * changed. The handle must be supplied on subsequent put or delete calls so + * that the row can be identified. + * + * @param row Name of row to start update against. + * @return Row lockid. + */ + public synchronized long startUpdate(Text row) { + RegionLocation info = getRegionLocation(row); + BatchUpdate batch = regionToBatch.get(info); + if(batch == null) { + batch = new BatchUpdate(); + regionToBatch.put(info, batch); + } + long lockid = batch.startUpdate(row); + lockToBatch.put(lockid, batch); + return lockid; + } + + /** + * Change the value for the specified column + * + * @param lockid lock id returned from startUpdate + * @param column column whose value is being set + * @param value new value for column + */ + public synchronized void put(long lockid, Text column, byte[] value) { + BatchUpdate batch = lockToBatch.get(lockid); + if (batch == null) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } + batch.put(lockid, column, value); + } + + /** + * Delete the value for a column + * + * @param lockid - lock id returned from startUpdate + * @param column - name of column whose value is to be deleted + */ + public synchronized void delete(long lockid, Text column) { + BatchUpdate batch = lockToBatch.get(lockid); + if (batch == null) { + throw new IllegalArgumentException("invalid lock id " + lockid); + } + batch.delete(lockid, column); + } + + /** + * Finalize a batch mutation + * + * @param timestamp time to associate with all the changes + * @throws IOException + */ + public synchronized void commit(long timestamp) throws IOException { + try { + for(Map.Entry e: regionToBatch.entrySet()) { + RegionLocation r = e.getKey(); + HRegionInterface server = getHRegionConnection(r.serverAddress); + server.batchUpdate(r.regionInfo.getRegionName(), timestamp, + e.getValue()); + } + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + } + + private BatchHandler batch; + /* * Data structure that holds current location for a region and its info. */ - protected static class RegionLocation { + @SuppressWarnings("unchecked") + protected static class RegionLocation implements Comparable { HRegionInfo regionInfo; HServerAddress serverAddress; @@ -84,18 +178,48 @@ public class HClient implements HConstants { } /** - * @return HRegionInfo + * {@inheritDoc} */ + @Override + public boolean equals(Object o) { + return this.compareTo(o) == 0; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + int result = this.regionInfo.hashCode(); + result ^= this.serverAddress.hashCode(); + return result; + } + + /** @return HRegionInfo */ public HRegionInfo getRegionInfo(){ return regionInfo; } - /** - * @return HServerAddress - */ + /** @return HServerAddress */ public HServerAddress getServerAddress(){ return serverAddress; } + + // + // Comparable + // + + /** + * {@inheritDoc} + */ + public int compareTo(Object o) { + RegionLocation other = (RegionLocation) o; + int result = this.regionInfo.compareTo(other.regionInfo); + if(result == 0) { + result = this.serverAddress.compareTo(other.serverAddress); + } + return result; + } } // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) @@ -121,6 +245,7 @@ public class HClient implements HConstants { */ public HClient(Configuration conf) { this.conf = conf; + this.batch = null; this.currentLockId = -1; this.pause = conf.getLong("hbase.client.pause", 30 * 1000); @@ -222,8 +347,6 @@ public class HClient implements HConstants { * * @param desc table descriptor for table * - * @throws RemoteException if exception occurred on remote side of - * connection. * @throws IllegalArgumentException if the table name is reserved * @throws MasterNotRunningException if master is not running * @throws NoServerForRegionException if root region is not being served @@ -254,8 +377,6 @@ public class HClient implements HConstants { * * @param desc table descriptor for table * - * @throws RemoteException if exception occurred on remote side of - * connection. * @throws IllegalArgumentException if the table name is reserved * @throws MasterNotRunningException if master is not running * @throws NoServerForRegionException if root region is not being served @@ -265,7 +386,7 @@ public class HClient implements HConstants { * @throws IOException */ public synchronized void createTableAsync(HTableDescriptor desc) - throws IOException { + throws IOException { checkReservedTableName(desc.getName()); checkMaster(); try { @@ -610,7 +731,7 @@ public class HClient implements HConstants { if(tableName == null || tableName.getLength() == 0) { throw new IllegalArgumentException("table name cannot be null or zero length"); } - if(this.currentLockId != -1) { + if(this.currentLockId != -1 || batch != null) { throw new IllegalStateException("update in progress"); } this.tableServers = getTableServers(tableName); @@ -1315,6 +1436,59 @@ public class HClient implements HConstants { return new ClientScanner(columns, startRow, timestamp, filter); } + /** + * Start a batch of row insertions/updates. + * + * No changes are committed until the call to commitBatchUpdate returns. + * A call to abortBatchUpdate will abandon the entire batch. + * + * Note that in batch mode, calls to commit or abort are ignored. + */ + public synchronized void startBatchUpdate() { + if(this.tableServers == null) { + throw new IllegalStateException("Must open table first"); + } + + if(batch == null) { + batch = new BatchHandler(); + } + } + + /** + * Abort a batch mutation + */ + public synchronized void abortBatch() { + batch = null; + } + + /** + * Finalize a batch mutation + * + * @throws IOException + */ + public synchronized void commitBatch() throws IOException { + commitBatch(System.currentTimeMillis()); + } + + /** + * Finalize a batch mutation + * + * @param timestamp time to associate with all the changes + * @throws IOException + */ + public synchronized void commitBatch(long timestamp) throws IOException { + if(batch == null) { + throw new IllegalStateException("no batch update in progress"); + } + + try { + batch.commit(timestamp); + + } finally { + batch = null; + } + } + /** * Start an atomic row insertion/update. No changes are committed until the * call to commit() returns. A call to abort() will abandon any updates in progress. @@ -1334,6 +1508,9 @@ public class HClient implements HConstants { if(this.currentLockId != -1) { throw new IllegalStateException("update in progress"); } + if(batch != null) { + return batch.startUpdate(row); + } for(int tries = 0; tries < numRetries; tries++) { IOException e = null; RegionLocation info = getRegionLocation(row); @@ -1379,6 +1556,14 @@ public class HClient implements HConstants { * @throws IOException */ public void put(long lockid, Text column, byte val[]) throws IOException { + if(val == null) { + throw new IllegalArgumentException("value cannot be null"); + } + if(batch != null) { + batch.put(lockid, column, val); + return; + } + if(lockid != this.currentLockId) { throw new IllegalArgumentException("invalid lockid"); } @@ -1408,6 +1593,11 @@ public class HClient implements HConstants { * @throws IOException */ public void delete(long lockid, Text column) throws IOException { + if(batch != null) { + batch.delete(lockid, column); + return; + } + if(lockid != this.currentLockId) { throw new IllegalArgumentException("invalid lockid"); } @@ -1436,6 +1626,10 @@ public class HClient implements HConstants { * @throws IOException */ public void abort(long lockid) throws IOException { + if(batch != null) { + return; + } + if(lockid != this.currentLockId) { throw new IllegalArgumentException("invalid lockid"); } @@ -1471,6 +1665,10 @@ public class HClient implements HConstants { * @throws IOException */ public void commit(long lockid, long timestamp) throws IOException { + if(batch != null) { + return; + } + if(lockid != this.currentLockId) { throw new IllegalArgumentException("invalid lockid"); } @@ -1497,6 +1695,13 @@ public class HClient implements HConstants { * @throws IOException */ public void renewLease(long lockid) throws IOException { + if(batch != null) { + return; + } + + if(lockid != this.currentLockId) { + throw new IllegalArgumentException("invalid lockid"); + } try { this.currentServer.renewLease(lockid, this.clientid); } catch(IOException e) { diff --git a/src/java/org/apache/hadoop/hbase/HRegionInfo.java b/src/java/org/apache/hadoop/hbase/HRegionInfo.java index 401a5087e25..a817bdf7f03 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -34,13 +34,39 @@ import org.apache.hadoop.io.WritableComparable; * HRegions' table descriptor, etc. */ public class HRegionInfo implements WritableComparable { + /** delimiter used between portions of a region name */ + public static final char DELIMITER = ','; + + /** + * Extracts table name prefix from a region name. + * Presumes region names are ASCII characters only. + * @param regionName A region name. + * @return The table prefix of a region name. + */ + public static Text getTableNameFromRegionName(final Text regionName) { + int index = -1; + byte [] bytes = regionName.getBytes(); + for (int i = 0; i < bytes.length; i++) { + if (((char) bytes[i]) == DELIMITER) { + index = i; + break; + } + } + if (index == -1) { + throw new IllegalArgumentException(regionName.toString() + " does not " + + "contain " + DELIMITER + " character"); + } + byte [] tableName = new byte[index]; + System.arraycopy(bytes, 0, tableName, 0, index); + return new Text(tableName); + } + Text regionName; long regionId; Text startKey; Text endKey; boolean offLine; HTableDescriptor tableDesc; - public static final char DELIMITER = ','; /** Default constructor - creates empty object */ public HRegionInfo() { @@ -100,6 +126,34 @@ public class HRegionInfo implements WritableComparable { this.offLine = false; } + /** @return the endKey */ + public Text getEndKey(){ + return endKey; + } + + /** @return the regionId */ + public long getRegionId(){ + return regionId; + } + + /** @return the regionName */ + public Text getRegionName(){ + return regionName; + } + + /** @return the startKey */ + public Text getStartKey(){ + return startKey; + } + + /** @return the tableDesc */ + public HTableDescriptor getTableDesc(){ + return tableDesc; + } + + /** + * {@inheritDoc} + */ @Override public String toString() { return "regionname: " + this.regionName.toString() + ", startKey: <" + @@ -107,11 +161,17 @@ public class HRegionInfo implements WritableComparable { this.tableDesc.toString() + "}"; } + /** + * {@inheritDoc} + */ @Override public boolean equals(Object o) { return this.compareTo(o) == 0; } + /** + * {@inheritDoc} + */ @Override public int hashCode() { int result = this.regionName.hashCode(); @@ -123,10 +183,13 @@ public class HRegionInfo implements WritableComparable { return result; } - ////////////////////////////////////////////////////////////////////////////// + // // Writable - ////////////////////////////////////////////////////////////////////////////// + // + /** + * {@inheritDoc} + */ public void write(DataOutput out) throws IOException { out.writeLong(regionId); tableDesc.write(out); @@ -136,6 +199,9 @@ public class HRegionInfo implements WritableComparable { out.writeBoolean(offLine); } + /** + * {@inheritDoc} + */ public void readFields(DataInput in) throws IOException { this.regionId = in.readLong(); this.tableDesc.readFields(in); @@ -145,69 +211,13 @@ public class HRegionInfo implements WritableComparable { this.offLine = in.readBoolean(); } - /** - * @return the endKey - */ - public Text getEndKey(){ - return endKey; - } - - /** - * @return the regionId - */ - public long getRegionId(){ - return regionId; - } - - /** - * @return the regionName - */ - public Text getRegionName(){ - return regionName; - } - - /** - * Extracts table name prefix from a region name. - * Presumes region names are ASCII characters only. - * @param regionName A region name. - * @return The table prefix of a region name. - */ - public static Text getTableNameFromRegionName(final Text regionName) { - int index = -1; - byte [] bytes = regionName.getBytes(); - for (int i = 0; i < bytes.length; i++) { - if (((char) bytes[i]) == DELIMITER) { - index = i; - break; - } - } - if (index == -1) { - throw new IllegalArgumentException(regionName.toString() + " does not " + - "contain " + DELIMITER + " character"); - } - byte [] tableName = new byte[index]; - System.arraycopy(bytes, 0, tableName, 0, index); - return new Text(tableName); - } - - /** - * @return the startKey - */ - public Text getStartKey(){ - return startKey; - } - - /** - * @return the tableDesc - */ - public HTableDescriptor getTableDesc(){ - return tableDesc; - } - - ///////////////////////////////////////////////////////////////////////////// + // // Comparable - ///////////////////////////////////////////////////////////////////////////// + // + /** + * {@inheritDoc} + */ public int compareTo(Object o) { HRegionInfo other = (HRegionInfo) o; diff --git a/src/java/org/apache/hadoop/hbase/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/HRegionInterface.java index b4167efe88e..91dbce02263 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInterface.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol; @@ -211,6 +212,16 @@ public interface HRegionInterface extends VersionedProtocol { long timestamp, RowFilterInterface filter) throws IOException; + /** + * Applies a batch of updates via one RPC + * + * @param regionName name of the region to update + * @param timestamp the time to be associated with the changes + * @param b BatchUpdate + * @throws IOException + */ + public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException; + /** * Get the next set of values * diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 761bff3f793..b9a394c09c9 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -38,6 +38,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; @@ -987,6 +989,29 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { return getRegion(regionName).getRegionInfo(); } + /** + * {@inheritDoc} + */ + public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException { + for(Map.Entry> e: b) { + Text row = e.getKey(); + long clientid = rand.nextLong(); + long lockid = startUpdate(regionName, clientid, row); + for(BatchOperation op: e.getValue()) { + switch(op.getOp()) { + case BatchOperation.PUT_OP: + put(regionName, clientid, lockid, op.getColumn(), op.getValue()); + break; + + case BatchOperation.DELETE_OP: + delete(regionName, clientid, lockid, op.getColumn()); + break; + } + } + commit(regionName, clientid, lockid, timestamp); + } + } + /** * {@inheritDoc} */ diff --git a/src/java/org/apache/hadoop/hbase/HServerAddress.java b/src/java/org/apache/hadoop/hbase/HServerAddress.java index dd9af9200f2..b8959ee7ee4 100644 --- a/src/java/org/apache/hadoop/hbase/HServerAddress.java +++ b/src/java/org/apache/hadoop/hbase/HServerAddress.java @@ -24,25 +24,35 @@ import org.apache.hadoop.io.*; import java.io.*; import java.net.InetSocketAddress; -/******************************************************************************* +/** * HServerAddress is a "label" for a HBase server that combines the host * name and port number. - ******************************************************************************/ -public class HServerAddress implements Writable { + */ +public class HServerAddress implements WritableComparable { private InetSocketAddress address; - private String stringValue; + String stringValue; + /** Empty constructor, used for Writable */ public HServerAddress() { this.address = null; this.stringValue = null; } - + + /** + * Construct a HServerAddress from an InetSocketAddress + * @param address InetSocketAddress of server + */ public HServerAddress(InetSocketAddress address) { this.address = address; this.stringValue = address.getAddress().getHostAddress() + ":" + address.getPort(); } + /** + * Construct a HServerAddress from a string of the form hostname:port + * + * @param hostAndPort format 'hostname:port' + */ public HServerAddress(String hostAndPort) { int colonIndex = hostAndPort.indexOf(':'); if(colonIndex < 0) { @@ -55,38 +65,76 @@ public class HServerAddress implements Writable { this.stringValue = hostAndPort; } + /** + * Construct a HServerAddress from hostname, port number + * @param bindAddress host name + * @param port port number + */ public HServerAddress(String bindAddress, int port) { this.address = new InetSocketAddress(bindAddress, port); this.stringValue = bindAddress + ":" + port; } + /** + * Construct a HServerAddress from another HServerAddress + * + * @param other the HServerAddress to copy from + */ public HServerAddress(HServerAddress other) { String bindAddress = other.getBindAddress(); int port = other.getPort(); address = new InetSocketAddress(bindAddress, port); stringValue = bindAddress + ":" + port; } - + + /** @return host name */ public String getBindAddress() { return address.getAddress().getHostAddress(); } - + + /** @return port number */ public int getPort() { return address.getPort(); } - + + /** @return the InetSocketAddress */ public InetSocketAddress getInetSocketAddress() { return address; } - + + /** + * {@inheritDoc} + */ + @Override public String toString() { return (stringValue == null ? "" : stringValue); } - ////////////////////////////////////////////////////////////////////////////// + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) { + return this.compareTo(o) == 0; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + int result = this.address.hashCode(); + result ^= this.stringValue.hashCode(); + return result; + } + + // // Writable - ////////////////////////////////////////////////////////////////////////////// + // + /** + * {@inheritDoc} + */ public void readFields(DataInput in) throws IOException { String bindAddress = in.readUTF(); int port = in.readInt(); @@ -101,6 +149,9 @@ public class HServerAddress implements Writable { } } + /** + * {@inheritDoc} + */ public void write(DataOutput out) throws IOException { if(address == null) { out.writeUTF(""); @@ -111,4 +162,16 @@ public class HServerAddress implements Writable { out.writeInt(address.getPort()); } } + + // + // Comparable + // + + /** + * {@inheritDoc} + */ + public int compareTo(Object o) { + HServerAddress other = (HServerAddress) o; + return this.toString().compareTo(other.toString()); + } } diff --git a/src/java/org/apache/hadoop/hbase/io/BatchOperation.java b/src/java/org/apache/hadoop/hbase/io/BatchOperation.java new file mode 100644 index 00000000000..e20009d7959 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/io/BatchOperation.java @@ -0,0 +1,121 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * batch update operation + */ +public class BatchOperation implements Writable { + /** put operation */ + public static final int PUT_OP = 1; + + /** delete operation */ + public static final int DELETE_OP = 2; + + private int op; + private Text column; + private byte[] value; + + /** default constructor used by Writable */ + public BatchOperation() { + this.op = 0; + this.column = new Text(); + this.value = null; + } + + /** + * Creates a put operation + * + * @param column column name + * @param value column value + */ + public BatchOperation(Text column, byte[] value) { + this.op = PUT_OP; + this.column = column; + this.value = value; + } + + /** + * Creates a delete operation + * + * @param column name of column to delete + */ + public BatchOperation(Text column) { + this.op = DELETE_OP; + this.column = column; + this.value = null; + } + + /** + * @return the column + */ + public Text getColumn() { + return column; + } + + /** + * @return the operation + */ + public int getOp() { + return op; + } + + /** + * @return the value + */ + public byte[] getValue() { + return value; + } + + // + // Writable + // + + /** + * {@inheritDoc} + */ + public void readFields(DataInput in) throws IOException { + op = in.readInt(); + column.readFields(in); + if(op == PUT_OP) { + value = new byte[in.readInt()]; + in.readFully(value); + } + } + + /** + * {@inheritDoc} + */ + public void write(DataOutput out) throws IOException { + out.writeInt(op); + column.write(out); + if(op == PUT_OP) { + out.writeInt(value.length); + out.write(value); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java new file mode 100644 index 00000000000..50754d5cf1f --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java @@ -0,0 +1,168 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * A Writable object that contains a series of BatchOperations + * + * There is one BatchUpdate object per server, so a series of batch operations + * can result in multiple BatchUpdate objects if the batch contains rows that + * are served by multiple region servers. + */ +public class BatchUpdate implements Writable, +Iterable>> { + + // used to generate lock ids + private Random rand; + + // used on client side to map lockid to a set of row updates + private HashMap> lockToRowOps; + + // the operations for each row + private HashMap> operations; + + /** constructor */ + public BatchUpdate() { + this.rand = new Random(); + this.lockToRowOps = new HashMap>(); + this.operations = new HashMap>(); + } + + /** + * Start a batch row insertion/update. + * + * No changes are committed until the client commits the batch operation via + * HClient.batchCommit(). + * + * The entire batch update can be abandoned by calling HClient.batchAbort(); + * + * Callers to this method are given a handle that corresponds to the row being + * changed. The handle must be supplied on subsequent put or delete calls so + * that the row can be identified. + * + * @param row Name of row to start update against. + * @return Row lockid. + */ + public synchronized long startUpdate(Text row) { + Long lockid = Long.valueOf(Math.abs(rand.nextLong())); + ArrayList ops = operations.get(row); + if(ops == null) { + ops = new ArrayList(); + operations.put(row, ops); + } + lockToRowOps.put(lockid, ops); + return lockid.longValue(); + } + + /** + * Change a value for the specified column + * + * @param lockid - lock id returned from startUpdate + * @param column - column whose value is being set + * @param val - new value for column + */ + public synchronized void put(long lockid, Text column, byte val[]) { + ArrayList ops = lockToRowOps.get(lockid); + if(ops == null) { + throw new IllegalArgumentException("no row for lockid " + lockid); + } + ops.add(new BatchOperation(column, val)); + } + + /** + * Delete the value for a column + * + * @param lockid - lock id returned from startUpdate + * @param column - name of column whose value is to be deleted + */ + public synchronized void delete(long lockid, Text column) { + ArrayList ops = lockToRowOps.get(lockid); + if(ops == null) { + throw new IllegalArgumentException("no row for lockid " + lockid); + } + ops.add(new BatchOperation(column)); + } + + // + // Iterable + // + + /** + * @return Iterator>> + * Text row -> ArrayList changes + */ + public Iterator>> iterator() { + return operations.entrySet().iterator(); + } + + // + // Writable + // + + /** + * {@inheritDoc} + */ + public void readFields(DataInput in) throws IOException { + int nOps = in.readInt(); + for (int i = 0; i < nOps; i++) { + Text row = new Text(); + row.readFields(in); + + int nRowOps = in.readInt(); + ArrayList rowOps = new ArrayList(); + for(int j = 0; j < nRowOps; j++) { + BatchOperation op = new BatchOperation(); + op.readFields(in); + rowOps.add(op); + } + + operations.put(row, rowOps); + } + } + + /** + * {@inheritDoc} + */ + public void write(DataOutput out) throws IOException { + out.writeInt(operations.size()); + for (Map.Entry> e: operations.entrySet()) { + e.getKey().write(out); + + ArrayList ops = e.getValue(); + out.writeInt(ops.size()); + + for(BatchOperation op: ops) { + op.write(out); + } + } + } +} diff --git a/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java b/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java new file mode 100644 index 00000000000..2aaf2301818 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java @@ -0,0 +1,104 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.Map; +import java.util.TreeMap; +import org.apache.hadoop.io.Text; + +/** + * Test batch updates + */ +public class TestBatchUpdate extends HBaseClusterTestCase { + private static final String CONTENTS_STR = "contents:"; + private static final Text CONTENTS = new Text(CONTENTS_STR); + private static final byte[] value = { 1, 2, 3, 4 }; + + private HTableDescriptor desc = null; + private HClient client = null; + + /** + * {@inheritDoc} + */ + @Override + public void setUp() throws Exception { + super.setUp(); + this.client = new HClient(conf); + this.desc = new HTableDescriptor("test"); + desc.addFamily(new HColumnDescriptor(CONTENTS_STR)); + try { + client.createTable(desc); + client.openTable(desc.getName()); + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + /** the test case */ + public void testBatchUpdate() { + try { + client.commitBatch(); + + } catch (IllegalStateException e) { + // expected + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + + client.startBatchUpdate(); + + try { + client.openTable(HConstants.META_TABLE_NAME); + + } catch (IllegalStateException e) { + // expected + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + try { + long lockid = client.startUpdate(new Text("row1")); + client.put(lockid, CONTENTS, value); + client.delete(lockid, CONTENTS); + + lockid = client.startUpdate(new Text("row2")); + client.put(lockid, CONTENTS, value); + + client.commitBatch(); + + Text[] columns = { CONTENTS }; + HScannerInterface scanner = client.obtainScanner(columns, new Text()); + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + while(scanner.next(key, results)) { + for(Map.Entry e: results.entrySet()) { + System.out.println(key + ": row: " + e.getKey() + " value: " + + new String(e.getValue())); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } +}