HBASE-950 HTable.commit no longer works with existing RowLocks though it's still in API
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@707225 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4357226cd4
commit
d0ce40d1ec
|
@ -38,6 +38,7 @@ Release 0.19.0 - Unreleased
|
||||||
HBASE-939 NPE in HStoreKey
|
HBASE-939 NPE in HStoreKey
|
||||||
HBASE-945 Be consistent in use of qualified/unqualified mapfile paths
|
HBASE-945 Be consistent in use of qualified/unqualified mapfile paths
|
||||||
HBASE-946 Row with 55k deletes timesout scanner lease
|
HBASE-946 Row with 55k deletes timesout scanner lease
|
||||||
|
HBASE-950 HTable.commit no longer works with existing RowLocks though it's still in API
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-901 Add a limit to key length, check key and value length on client side
|
HBASE-901 Add a limit to key length, check key and value length on client side
|
||||||
|
|
|
@ -994,6 +994,9 @@ public class HTable {
|
||||||
final RowLock rl)
|
final RowLock rl)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkRowAndColumns(batchUpdate);
|
checkRowAndColumns(batchUpdate);
|
||||||
|
if(rl != null) {
|
||||||
|
batchUpdate.setRowLock(rl.getLockId());
|
||||||
|
}
|
||||||
writeBuffer.add(batchUpdate);
|
writeBuffer.add(batchUpdate);
|
||||||
currentWriteBufferSize += batchUpdate.getSize();
|
currentWriteBufferSize += batchUpdate.getSize();
|
||||||
if(autoFlush || currentWriteBufferSize > writeBufferSize) {
|
if(autoFlush || currentWriteBufferSize > writeBufferSize) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.client.RowLock;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.WritableComparable;
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
|
||||||
|
@ -50,6 +51,8 @@ public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
||||||
|
|
||||||
private long timestamp = HConstants.LATEST_TIMESTAMP;
|
private long timestamp = HConstants.LATEST_TIMESTAMP;
|
||||||
|
|
||||||
|
private long rowLock = -1l;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor used serializing. Do not use directly.
|
* Default constructor used serializing. Do not use directly.
|
||||||
*/
|
*/
|
||||||
|
@ -99,6 +102,22 @@ public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
||||||
this.operations = new ArrayList<BatchOperation>();
|
this.operations = new ArrayList<BatchOperation>();
|
||||||
this.size = (row == null)? 0: row.length;
|
this.size = (row == null)? 0: row.length;
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Get the row lock associated with this update
|
||||||
|
* @return the row lock
|
||||||
|
*/
|
||||||
|
public long getRowLock() {
|
||||||
|
return rowLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the lock to be used for this update
|
||||||
|
* @param rowLock the row lock
|
||||||
|
*/
|
||||||
|
public void setRowLock(long rowLock) {
|
||||||
|
this.rowLock = rowLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** @return the row */
|
/** @return the row */
|
||||||
public byte [] getRow() {
|
public byte [] getRow() {
|
||||||
|
@ -283,6 +302,7 @@ public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
||||||
op.readFields(in);
|
op.readFields(in);
|
||||||
this.operations.add(op);
|
this.operations.add(op);
|
||||||
}
|
}
|
||||||
|
this.rowLock = in.readLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void write(final DataOutput out) throws IOException {
|
public void write(final DataOutput out) throws IOException {
|
||||||
|
@ -293,6 +313,7 @@ public class BatchUpdate implements WritableComparable<BatchUpdate>,
|
||||||
for (BatchOperation op: operations) {
|
for (BatchOperation op: operations) {
|
||||||
op.write(out);
|
op.write(out);
|
||||||
}
|
}
|
||||||
|
out.writeLong(this.rowLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int compareTo(BatchUpdate o) {
|
public int compareTo(BatchUpdate o) {
|
||||||
|
|
|
@ -1358,11 +1358,12 @@ public class HRegion implements HConstants {
|
||||||
* blocked while updating.
|
* blocked while updating.
|
||||||
* @param bus
|
* @param bus
|
||||||
*/
|
*/
|
||||||
public void batchUpdate(BatchUpdate[] bus) throws IOException {
|
public void batchUpdate(BatchUpdate[] bus, Integer[] locks)
|
||||||
|
throws IOException {
|
||||||
splitsAndClosesLock.readLock().lock();
|
splitsAndClosesLock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
for (BatchUpdate bu : bus) {
|
for (int i = 0; i < bus.length; i++) {
|
||||||
batchUpdate(bu, null);
|
batchUpdate(bus[i], locks[i]);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
splitsAndClosesLock.readLock().unlock();
|
splitsAndClosesLock.readLock().unlock();
|
||||||
|
|
|
@ -1147,7 +1147,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
validateValuesLength(b, region);
|
validateValuesLength(b, region);
|
||||||
try {
|
try {
|
||||||
cacheFlusher.reclaimMemcacheMemory();
|
cacheFlusher.reclaimMemcacheMemory();
|
||||||
region.batchUpdate(b, getLockFromId(lockId));
|
region.batchUpdate(b, getLockFromId(b.getRowLock()));
|
||||||
} catch (OutOfMemoryError error) {
|
} catch (OutOfMemoryError error) {
|
||||||
abort();
|
abort();
|
||||||
LOG.fatal("Ran out of memory", error);
|
LOG.fatal("Ran out of memory", error);
|
||||||
|
@ -1164,12 +1164,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
try {
|
try {
|
||||||
HRegion region = getRegion(regionName);
|
HRegion region = getRegion(regionName);
|
||||||
this.cacheFlusher.reclaimMemcacheMemory();
|
this.cacheFlusher.reclaimMemcacheMemory();
|
||||||
for (BatchUpdate batchUpdate : b) {
|
Integer[] locks = new Integer[b.length];
|
||||||
|
for (int j = 0; j < b.length; j++) {
|
||||||
this.requestCount.incrementAndGet();
|
this.requestCount.incrementAndGet();
|
||||||
validateValuesLength(batchUpdate, region);
|
validateValuesLength(b[j], region);
|
||||||
|
locks[j] = getLockFromId(b[j].getRowLock());
|
||||||
}
|
}
|
||||||
i+= b.length-1;
|
i+= b.length-1;
|
||||||
region.batchUpdate(b);
|
region.batchUpdate(b, locks);
|
||||||
} catch (OutOfMemoryError error) {
|
} catch (OutOfMemoryError error) {
|
||||||
abort();
|
abort();
|
||||||
LOG.fatal("Ran out of memory", error);
|
LOG.fatal("Ran out of memory", error);
|
||||||
|
|
Loading…
Reference in New Issue