HBASE-4720 Implement atomic update operations (checkAndPut, checkAndDelete) for REST client/server
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1344398 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2bbc95823c
commit
b61974d1ce
|
@ -55,8 +55,12 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
public class RowResource extends ResourceBase {
|
||||
private static final Log LOG = LogFactory.getLog(RowResource.class);
|
||||
|
||||
static final String CHECK_PUT = "put";
|
||||
static final String CHECK_DELETE = "delete";
|
||||
|
||||
TableResource tableResource;
|
||||
RowSpec rowspec;
|
||||
private String check = null;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -66,13 +70,14 @@ public class RowResource extends ResourceBase {
|
|||
* @throws IOException
|
||||
*/
|
||||
public RowResource(TableResource tableResource, String rowspec,
|
||||
String versions) throws IOException {
|
||||
String versions, String check) throws IOException {
|
||||
super();
|
||||
this.tableResource = tableResource;
|
||||
this.rowspec = new RowSpec(rowspec);
|
||||
if (versions != null) {
|
||||
this.rowspec.setMaxVersions(Integer.valueOf(versions));
|
||||
}
|
||||
this.check = check;
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -151,6 +156,15 @@ public class RowResource extends ResourceBase {
|
|||
if (servlet.isReadOnly()) {
|
||||
throw new WebApplicationException(Response.Status.FORBIDDEN);
|
||||
}
|
||||
|
||||
if (CHECK_PUT.equalsIgnoreCase(check)) {
|
||||
return checkAndPut(model);
|
||||
} else if (CHECK_DELETE.equalsIgnoreCase(check)) {
|
||||
return checkAndDelete(model);
|
||||
} else if (check != null && check.length() > 0) {
|
||||
LOG.warn("Unknown check value: " + check + ", ignored");
|
||||
}
|
||||
|
||||
HTablePool pool = servlet.getTablePool();
|
||||
HTableInterface table = null;
|
||||
try {
|
||||
|
@ -277,7 +291,8 @@ public class RowResource extends ResourceBase {
|
|||
public Response put(final CellSetModel model,
|
||||
final @Context UriInfo uriInfo) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("PUT " + uriInfo.getAbsolutePath());
|
||||
LOG.debug("PUT " + uriInfo.getAbsolutePath()
|
||||
+ " " + uriInfo.getQueryParameters());
|
||||
}
|
||||
return update(model, true);
|
||||
}
|
||||
|
@ -297,7 +312,8 @@ public class RowResource extends ResourceBase {
|
|||
public Response post(final CellSetModel model,
|
||||
final @Context UriInfo uriInfo) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("POST " + uriInfo.getAbsolutePath());
|
||||
LOG.debug("POST " + uriInfo.getAbsolutePath()
|
||||
+ " " + uriInfo.getQueryParameters());
|
||||
}
|
||||
return update(model, false);
|
||||
}
|
||||
|
@ -368,4 +384,146 @@ public class RowResource extends ResourceBase {
|
|||
}
|
||||
return Response.ok().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the input request parameters, parses columns from CellSetModel,
|
||||
* and invokes checkAndPut on HTable.
|
||||
*
|
||||
* @param model instance of CellSetModel
|
||||
* @return Response 200 OK, 304 Not modified, 400 Bad request
|
||||
*/
|
||||
Response checkAndPut(final CellSetModel model) {
|
||||
HTablePool pool = servlet.getTablePool();
|
||||
HTableInterface table = null;
|
||||
try {
|
||||
if (model.getRows().size() != 1) {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
|
||||
RowModel rowModel = model.getRows().get(0);
|
||||
byte[] key = rowModel.getKey();
|
||||
if (key == null) {
|
||||
key = rowspec.getRow();
|
||||
}
|
||||
|
||||
List<CellModel> cellModels = rowModel.getCells();
|
||||
int cellModelCount = cellModels.size();
|
||||
if (key == null || cellModelCount <= 1) {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
|
||||
Put put = new Put(key);
|
||||
CellModel valueToCheckCell = cellModels.get(cellModelCount - 1);
|
||||
byte[] valueToCheckColumn = valueToCheckCell.getColumn();
|
||||
byte[][] valueToPutParts = KeyValue.parseColumn(valueToCheckColumn);
|
||||
if (valueToPutParts.length == 2 && valueToPutParts[1].length > 0) {
|
||||
CellModel valueToPutCell = null;
|
||||
for (int i = 0, n = cellModelCount - 1; i < n ; i++) {
|
||||
if(Bytes.equals(cellModels.get(i).getColumn(),
|
||||
valueToCheckCell.getColumn())) {
|
||||
valueToPutCell = cellModels.get(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (valueToPutCell != null) {
|
||||
put.add(valueToPutParts[0], valueToPutParts[1], valueToPutCell
|
||||
.getTimestamp(), valueToPutCell.getValue());
|
||||
} else {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
} else {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
|
||||
table = pool.getTable(this.tableResource.getName());
|
||||
boolean retValue = table.checkAndPut(key, valueToPutParts[0],
|
||||
valueToPutParts[1], valueToCheckCell.getValue(), put);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("CHECK-AND-PUT " + put.toString() + ", returns " + retValue);
|
||||
}
|
||||
table.flushCommits();
|
||||
ResponseBuilder response = Response.ok();
|
||||
if (!retValue) {
|
||||
response = Response.status(304);
|
||||
}
|
||||
return response.build();
|
||||
} catch (IOException e) {
|
||||
throw new WebApplicationException(e, Response.Status.SERVICE_UNAVAILABLE);
|
||||
} finally {
|
||||
try {
|
||||
if(table != null){
|
||||
pool.putTable(table);
|
||||
}
|
||||
} catch (Exception ioe) {
|
||||
throw new WebApplicationException(ioe,
|
||||
Response.Status.SERVICE_UNAVAILABLE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the input request parameters, parses columns from CellSetModel,
|
||||
* and invokes checkAndDelete on HTable.
|
||||
*
|
||||
* @param model instance of CellSetModel
|
||||
* @return Response 200 OK, 304 Not modified, 400 Bad request
|
||||
*/
|
||||
Response checkAndDelete(final CellSetModel model) {
|
||||
HTablePool pool = servlet.getTablePool();
|
||||
HTableInterface table = null;
|
||||
Delete delete = null;
|
||||
try {
|
||||
if (model.getRows().size() != 1) {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
RowModel rowModel = model.getRows().get(0);
|
||||
byte[] key = rowModel.getKey();
|
||||
if (key == null) {
|
||||
key = rowspec.getRow();
|
||||
}
|
||||
if (key == null) {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
|
||||
delete = new Delete(key);
|
||||
CellModel valueToDeleteCell = rowModel.getCells().get(0);
|
||||
byte[] valueToDeleteColumn = valueToDeleteCell.getColumn();
|
||||
if (valueToDeleteColumn == null) {
|
||||
try {
|
||||
valueToDeleteColumn = rowspec.getColumns()[0];
|
||||
} catch (final ArrayIndexOutOfBoundsException e) {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
byte[][] parts = KeyValue.parseColumn(valueToDeleteColumn);
|
||||
if (parts.length == 2 && parts[1].length > 0) {
|
||||
delete.deleteColumns(parts[0], parts[1]);
|
||||
} else {
|
||||
throw new WebApplicationException(Response.Status.BAD_REQUEST);
|
||||
}
|
||||
|
||||
table = pool.getTable(tableResource.getName());
|
||||
boolean retValue = table.checkAndDelete(key, parts[0], parts[1],
|
||||
valueToDeleteCell.getValue(), delete);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("CHECK-AND-DELETE " + delete.toString() + ", returns "
|
||||
+ retValue);
|
||||
}
|
||||
table.flushCommits();
|
||||
ResponseBuilder response = Response.ok();
|
||||
if (!retValue) {
|
||||
response = Response.status(304);
|
||||
}
|
||||
return response.build();
|
||||
} catch (IOException e) {
|
||||
throw new WebApplicationException(e, Response.Status.SERVICE_UNAVAILABLE);
|
||||
} finally {
|
||||
try {
|
||||
pool.putTable(table);
|
||||
} catch (Exception ioe) {
|
||||
throw new WebApplicationException(ioe,
|
||||
Response.Status.SERVICE_UNAVAILABLE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,7 +94,8 @@ public class TableResource extends ResourceBase {
|
|||
// We need the @Encoded decorator so Jersey won't urldecode before
|
||||
// the RowSpec constructor has a chance to parse
|
||||
final @PathParam("rowspec") @Encoded String rowspec,
|
||||
final @QueryParam("v") String versions) throws IOException {
|
||||
return new RowResource(this, rowspec, versions);
|
||||
final @QueryParam("v") String versions,
|
||||
final @QueryParam("check") String check) throws IOException {
|
||||
return new RowResource(this, rowspec, versions, check);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -589,12 +589,80 @@ public class RemoteHTable implements HTableInterface {
|
|||
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) throws IOException {
|
||||
throw new IOException("checkAndPut not supported");
|
||||
// column to check-the-value
|
||||
put.add(new KeyValue(row, family, qualifier, value));
|
||||
|
||||
CellSetModel model = buildModelFromPut(put);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append('/');
|
||||
if (accessToken != null) {
|
||||
sb.append(accessToken);
|
||||
sb.append('/');
|
||||
}
|
||||
sb.append(Bytes.toStringBinary(name));
|
||||
sb.append('/');
|
||||
sb.append(Bytes.toStringBinary(put.getRow()));
|
||||
sb.append("?check=put");
|
||||
|
||||
for (int i = 0; i < maxRetries; i++) {
|
||||
Response response = client.put(sb.toString(),
|
||||
Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
|
||||
int code = response.getCode();
|
||||
switch (code) {
|
||||
case 200:
|
||||
return true;
|
||||
case 304: // NOT-MODIFIED
|
||||
return false;
|
||||
case 509:
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (final InterruptedException e) {
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IOException("checkAndPut request failed with " + code);
|
||||
}
|
||||
}
|
||||
throw new IOException("checkAndPut request timed out");
|
||||
}
|
||||
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException {
|
||||
throw new IOException("checkAndDelete not supported");
|
||||
Put put = new Put(row);
|
||||
// column to check-the-value
|
||||
put.add(new KeyValue(row, family, qualifier, value));
|
||||
CellSetModel model = buildModelFromPut(put);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append('/');
|
||||
if (accessToken != null) {
|
||||
sb.append(accessToken);
|
||||
sb.append('/');
|
||||
}
|
||||
sb.append(Bytes.toStringBinary(name));
|
||||
sb.append('/');
|
||||
sb.append(Bytes.toStringBinary(row));
|
||||
sb.append("?check=delete");
|
||||
|
||||
for (int i = 0; i < maxRetries; i++) {
|
||||
Response response = client.put(sb.toString(),
|
||||
Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
|
||||
int code = response.getCode();
|
||||
switch (code) {
|
||||
case 200:
|
||||
return true;
|
||||
case 304: // NOT-MODIFIED
|
||||
return false;
|
||||
case 509:
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (final InterruptedException e) {
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IOException("checkAndDelete request failed with " + code);
|
||||
}
|
||||
}
|
||||
throw new IOException("checkAndDelete request timed out");
|
||||
}
|
||||
|
||||
public Result increment(Increment increment) throws IOException {
|
||||
|
|
|
@ -265,11 +265,120 @@ public class TestRowResource {
|
|||
assertEquals(Bytes.toString(cell.getValue()), value);
|
||||
}
|
||||
|
||||
private static Response checkAndPutValuePB(String url, String table,
|
||||
String row, String column, String valueToCheck, String valueToPut)
|
||||
throws IOException {
|
||||
RowModel rowModel = new RowModel(row);
|
||||
rowModel.addCell(new CellModel(Bytes.toBytes(column),
|
||||
Bytes.toBytes(valueToPut)));
|
||||
rowModel.addCell(new CellModel(Bytes.toBytes(column),
|
||||
Bytes.toBytes(valueToCheck)));
|
||||
CellSetModel cellSetModel = new CellSetModel();
|
||||
cellSetModel.addRow(rowModel);
|
||||
Response response = client.put(url, Constants.MIMETYPE_PROTOBUF,
|
||||
cellSetModel.createProtobufOutput());
|
||||
Thread.yield();
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Response checkAndPutValuePB(String table, String row,
|
||||
String column, String valueToCheck, String valueToPut) throws IOException {
|
||||
StringBuilder path = new StringBuilder();
|
||||
path.append('/');
|
||||
path.append(table);
|
||||
path.append('/');
|
||||
path.append(row);
|
||||
path.append("?check=put");
|
||||
return checkAndPutValuePB(path.toString(), table, row, column,
|
||||
valueToCheck, valueToPut);
|
||||
}
|
||||
|
||||
private static Response checkAndPutValueXML(String url, String table,
|
||||
String row, String column, String valueToCheck, String valueToPut)
|
||||
throws IOException, JAXBException {
|
||||
RowModel rowModel = new RowModel(row);
|
||||
rowModel.addCell(new CellModel(Bytes.toBytes(column),
|
||||
Bytes.toBytes(valueToPut)));
|
||||
rowModel.addCell(new CellModel(Bytes.toBytes(column),
|
||||
Bytes.toBytes(valueToCheck)));
|
||||
CellSetModel cellSetModel = new CellSetModel();
|
||||
cellSetModel.addRow(rowModel);
|
||||
StringWriter writer = new StringWriter();
|
||||
marshaller.marshal(cellSetModel, writer);
|
||||
Response response = client.put(url, Constants.MIMETYPE_XML,
|
||||
Bytes.toBytes(writer.toString()));
|
||||
Thread.yield();
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Response checkAndPutValueXML(String table, String row,
|
||||
String column, String valueToCheck, String valueToPut)
|
||||
throws IOException, JAXBException {
|
||||
StringBuilder path = new StringBuilder();
|
||||
path.append('/');
|
||||
path.append(table);
|
||||
path.append('/');
|
||||
path.append(row);
|
||||
path.append("?check=put");
|
||||
return checkAndPutValueXML(path.toString(), table, row, column,
|
||||
valueToCheck, valueToPut);
|
||||
}
|
||||
|
||||
private static Response checkAndDeleteXML(String url, String table,
|
||||
String row, String column, String valueToCheck)
|
||||
throws IOException, JAXBException {
|
||||
RowModel rowModel = new RowModel(row);
|
||||
rowModel.addCell(new CellModel(Bytes.toBytes(column),
|
||||
Bytes.toBytes(valueToCheck)));
|
||||
CellSetModel cellSetModel = new CellSetModel();
|
||||
cellSetModel.addRow(rowModel);
|
||||
StringWriter writer = new StringWriter();
|
||||
marshaller.marshal(cellSetModel, writer);
|
||||
Response response = client.put(url, Constants.MIMETYPE_XML,
|
||||
Bytes.toBytes(writer.toString()));
|
||||
Thread.yield();
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Response checkAndDeleteXML(String table, String row,
|
||||
String column, String valueToCheck) throws IOException, JAXBException {
|
||||
StringBuilder path = new StringBuilder();
|
||||
path.append('/');
|
||||
path.append(table);
|
||||
path.append('/');
|
||||
path.append(row);
|
||||
path.append("?check=delete");
|
||||
return checkAndDeleteXML(path.toString(), table, row, column, valueToCheck);
|
||||
}
|
||||
|
||||
private static Response checkAndDeletePB(String table, String row,
|
||||
String column, String value) throws IOException {
|
||||
StringBuilder path = new StringBuilder();
|
||||
path.append('/');
|
||||
path.append(table);
|
||||
path.append('/');
|
||||
path.append(row);
|
||||
path.append("?check=delete");
|
||||
return checkAndDeleteValuePB(path.toString(), table, row, column, value);
|
||||
}
|
||||
|
||||
private static Response checkAndDeleteValuePB(String url, String table,
|
||||
String row, String column, String valueToCheck)
|
||||
throws IOException {
|
||||
RowModel rowModel = new RowModel(row);
|
||||
rowModel.addCell(new CellModel(Bytes.toBytes(column), Bytes
|
||||
.toBytes(valueToCheck)));
|
||||
CellSetModel cellSetModel = new CellSetModel();
|
||||
cellSetModel.addRow(rowModel);
|
||||
Response response = client.put(url, Constants.MIMETYPE_PROTOBUF,
|
||||
cellSetModel.createProtobufOutput());
|
||||
Thread.yield();
|
||||
return response;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() throws IOException, JAXBException {
|
||||
Response response;
|
||||
|
||||
response = putValueXML(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
Response response = putValueXML(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = putValueXML(TABLE, ROW_1, COLUMN_2, VALUE_2);
|
||||
assertEquals(response.getCode(), 200);
|
||||
|
@ -282,6 +391,13 @@ public class TestRowResource {
|
|||
assertEquals(response.getCode(), 404);
|
||||
checkValueXML(TABLE, ROW_1, COLUMN_2, VALUE_2);
|
||||
|
||||
response = putValueXML(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = checkAndDeletePB(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = getValueXML(TABLE, ROW_1, COLUMN_1);
|
||||
assertEquals(response.getCode(), 404);
|
||||
|
||||
response = deleteRow(TABLE, ROW_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = getValueXML(TABLE, ROW_1, COLUMN_1);
|
||||
|
@ -292,16 +408,20 @@ public class TestRowResource {
|
|||
|
||||
@Test
|
||||
public void testForbidden() throws IOException, JAXBException {
|
||||
Response response;
|
||||
|
||||
conf.set("hbase.rest.readonly", "true");
|
||||
|
||||
response = putValueXML(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
Response response = putValueXML(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
assertEquals(response.getCode(), 403);
|
||||
response = putValuePB(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
assertEquals(response.getCode(), 403);
|
||||
response = checkAndPutValueXML(TABLE, ROW_1, COLUMN_1, VALUE_1, VALUE_2);
|
||||
assertEquals(response.getCode(), 403);
|
||||
response = checkAndPutValuePB(TABLE, ROW_1, COLUMN_1, VALUE_1, VALUE_2);
|
||||
assertEquals(response.getCode(), 403);
|
||||
response = deleteValue(TABLE, ROW_1, COLUMN_1);
|
||||
assertEquals(response.getCode(), 403);
|
||||
response = checkAndDeletePB(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
assertEquals(response.getCode(), 403);
|
||||
response = deleteRow(TABLE, ROW_1);
|
||||
assertEquals(response.getCode(), 403);
|
||||
|
||||
|
@ -311,6 +431,10 @@ public class TestRowResource {
|
|||
assertEquals(response.getCode(), 200);
|
||||
response = putValuePB(TABLE, ROW_1, COLUMN_1, VALUE_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = checkAndPutValueXML(TABLE, ROW_1, COLUMN_1, VALUE_1, VALUE_2);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = checkAndPutValuePB(TABLE, ROW_1, COLUMN_1, VALUE_2, VALUE_3);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = deleteValue(TABLE, ROW_1, COLUMN_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
response = deleteRow(TABLE, ROW_1);
|
||||
|
@ -328,6 +452,11 @@ public class TestRowResource {
|
|||
response = putValueXML(TABLE, ROW_1, COLUMN_1, VALUE_2);
|
||||
assertEquals(response.getCode(), 200);
|
||||
checkValueXML(TABLE, ROW_1, COLUMN_1, VALUE_2);
|
||||
response = checkAndPutValueXML(TABLE, ROW_1, COLUMN_1, VALUE_2, VALUE_3);
|
||||
assertEquals(response.getCode(), 200);
|
||||
checkValueXML(TABLE, ROW_1, COLUMN_1, VALUE_3);
|
||||
response = checkAndDeleteXML(TABLE, ROW_1, COLUMN_1, VALUE_3);
|
||||
assertEquals(response.getCode(), 200);
|
||||
|
||||
response = deleteRow(TABLE, ROW_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
|
@ -349,6 +478,13 @@ public class TestRowResource {
|
|||
assertEquals(response.getCode(), 200);
|
||||
checkValuePB(TABLE, ROW_1, COLUMN_1, VALUE_2);
|
||||
|
||||
response = checkAndPutValuePB(TABLE, ROW_1, COLUMN_1, VALUE_2, VALUE_3);
|
||||
assertEquals(response.getCode(), 200);
|
||||
checkValuePB(TABLE, ROW_1, COLUMN_1, VALUE_3);
|
||||
response = checkAndPutValueXML(TABLE, ROW_1, COLUMN_1, VALUE_3, VALUE_4);
|
||||
assertEquals(response.getCode(), 200);
|
||||
checkValuePB(TABLE, ROW_1, COLUMN_1, VALUE_4);
|
||||
|
||||
response = deleteRow(TABLE, ROW_1);
|
||||
assertEquals(response.getCode(), 200);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue