HBASE-1705 Thrift server: deletes in mutateRow/s don't delete

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@798227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-07-27 17:26:10 +00:00
parent d38846e59a
commit 01a60aa374
2 changed files with 26 additions and 7 deletions

View File

@ -291,6 +291,8 @@ Release 0.20.0 - Unreleased
HBASE-1692 Web UI is extremely slow / freezes up if you have many tables HBASE-1692 Web UI is extremely slow / freezes up if you have many tables
HBASE-1686 major compaction can create empty store files, causing AIOOB HBASE-1686 major compaction can create empty store files, causing AIOOB
when trying to read when trying to read
HBASE-1705 Thrift server: deletes in mutateRow/s don't delete
(Tim Sell and Ryan Rawson via Stack)
IMPROVEMENTS IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -444,11 +444,24 @@ public class ThriftServer {
table = getTable(tableName); table = getTable(tableName);
Put put = new Put(row); Put put = new Put(row);
put.setTimeStamp(timestamp); put.setTimeStamp(timestamp);
Delete delete = new Delete(row);
for (Mutation m : mutations) { for (Mutation m : mutations) {
byte [][] famAndQf = KeyValue.parseColumn(m.column); byte[][] famAndQf = KeyValue.parseColumn(m.column);
put.add(famAndQf[0], famAndQf[1], m.value); if (m.isDelete) {
if (famAndQf[1].length == 0)
delete.deleteFamily(famAndQf[0], timestamp);
else
delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
} else {
put.add(famAndQf[0], famAndQf[1], m.value);
}
} }
table.put(put); if (!delete.isEmpty())
table.delete(delete);
if (!put.isEmpty())
table.put(put);
} catch (IOException e) { } catch (IOException e) {
throw new IOError(e.getMessage()); throw new IOError(e.getMessage());
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
@ -473,9 +486,13 @@ public class ThriftServer {
Put put = new Put(row); Put put = new Put(row);
put.setTimeStamp(timestamp); put.setTimeStamp(timestamp);
for (Mutation m : mutations) { for (Mutation m : mutations) {
byte [][] famAndQf = KeyValue.parseColumn(m.column); byte[][] famAndQf = KeyValue.parseColumn(m.column);
if (m.isDelete) { if (m.isDelete) {
delete.deleteColumns(famAndQf[0], famAndQf[1]); // no qualifier, family only.
if (famAndQf[1].length == 0)
delete.deleteFamily(famAndQf[0], timestamp);
else
delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
} else { } else {
put.add(famAndQf[0], famAndQf[1], m.value); put.add(famAndQf[0], famAndQf[1], m.value);
} }