NIFI-3538 Added DeleteHBaseRow

This closes #2294.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mike Thomsen 2018-02-13 13:12:00 -05:00 committed by Koji Kawamura
parent 28067a29fd
commit 143d7e6829
7 changed files with 580 additions and 1 deletions

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public abstract class AbstractDeleteHBase extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder()
.name("Row Identifier")
.description("Specifies the Row ID to use when deleting data into HBase")
.required(false) // not all sub-classes will require this
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build();
protected HBaseClientService clientService;
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
}
@Override
public Set<Relationship> getRelationships() {
Set<Relationship> set = new HashSet<>();
set.add(REL_SUCCESS);
set.add(REL_FAILURE);
return set;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(ROW_ID);
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
doDelete(context, session);
} catch (Exception e) {
getLogger().error("Error", e);
} finally {
session.commit();
}
}
protected abstract void doDelete(ProcessContext context, ProcessSession session) throws Exception;
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
@WritesAttributes(
value = {
@WritesAttribute( attribute = "restart.index", description = "If a delete batch fails, 'restart.index' attribute is added to the FlowFile and sent to 'failure' " +
"relationship, so that this processor can retry from there when the same FlowFile is routed again." ),
@WritesAttribute( attribute = "rowkey.start", description = "The first rowkey in the flowfile. Only written when using the flowfile's content for the row IDs."),
@WritesAttribute( attribute = "rowkey.end", description = "The last rowkey in the flowfile. Only written when using the flowfile's content for the row IDs.")
}
)
@Tags({ "delete", "hbase" })
@CapabilityDescription(
"Delete HBase records individually or in batches. The input can be a single row ID in the flowfile content, one ID per line, " +
"row IDs separated by a configurable separator character (default is a comma). ")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class DeleteHBaseRow extends AbstractDeleteHBase {
static final AllowableValue ROW_ID_CONTENT = new AllowableValue("content", "FlowFile content", "Get the row key(s) from the flowfile content.");
static final AllowableValue ROW_ID_ATTR = new AllowableValue("attr", "FlowFile attributes", "Get the row key from an expression language statement.");
static final String RESTART_INDEX = "restart.index";
static final String ROWKEY_START = "rowkey.start";
static final String ROWKEY_END = "rowkey.end";
static final PropertyDescriptor ROW_ID_LOCATION = new PropertyDescriptor.Builder()
.name("delete-hb-row-id-location")
.displayName("Row ID Location")
.description("The location of the row ID to use for building the delete. Can be from the content or an expression language statement.")
.required(true)
.defaultValue(ROW_ID_CONTENT.getValue())
.allowableValues(ROW_ID_CONTENT, ROW_ID_ATTR)
.addValidator(Validator.VALID)
.build();
static final PropertyDescriptor FLOWFILE_FETCH_COUNT = new PropertyDescriptor.Builder()
.name("delete-hb-flowfile-fetch-count")
.displayName("Flowfile Fetch Count")
.description("The number of flowfiles to fetch per run.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.expressionLanguageSupported(false)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("delete-hb-batch-size")
.displayName("Batch Size")
.description("The number of deletes to send per batch.")
.required(true)
.defaultValue("50")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.build();
static final PropertyDescriptor KEY_SEPARATOR = new PropertyDescriptor.Builder()
.name("delete-hb-separator")
.displayName("Delete Row Key Separator")
.description("The separator character(s) that separate multiple row keys " +
"when multiple row keys are provided in the flowfile content")
.required(true)
.defaultValue(",")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("delete-char-set")
.displayName("Character Set")
.description("The character set used to encode the row key for HBase.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = super.getSupportedPropertyDescriptors();
properties.add(ROW_ID_LOCATION);
properties.add(FLOWFILE_FETCH_COUNT);
properties.add(BATCH_SIZE);
properties.add(KEY_SEPARATOR);
properties.add(CHARSET);
return properties;
}
@Override
protected void doDelete(ProcessContext context, ProcessSession session) throws Exception {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final String location = context.getProperty(ROW_ID_LOCATION).getValue();
final int flowFileCount = context.getProperty(FLOWFILE_FETCH_COUNT).asInteger();
final String charset = context.getProperty(CHARSET).getValue();
List<FlowFile> flowFiles = session.get(flowFileCount);
if (flowFiles != null && flowFiles.size() > 0) {
for (int index = 0; index < flowFiles.size(); index++) {
FlowFile flowFile = flowFiles.get(index);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
try {
if (location.equals(ROW_ID_CONTENT.getValue())) {
flowFile = doDeleteFromContent(flowFile, context, session, tableName, batchSize, charset);
if (flowFile.getAttribute(RESTART_INDEX) != null) {
session.transfer(flowFile, REL_FAILURE);
} else {
final String transitUrl = clientService.toTransitUri(tableName, flowFile.getAttribute(ROWKEY_END));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
}
} else {
String transitUrl = doDeleteFromAttribute(flowFile, context, tableName, charset);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
}
} catch (Exception ex) {
getLogger().error(ex.getMessage(), ex);
session.transfer(flowFile, REL_FAILURE);
}
}
}
}
private String doDeleteFromAttribute(FlowFile flowFile, ProcessContext context, String tableName, String charset) throws Exception {
String rowKey = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
clientService.delete(tableName, rowKey.getBytes(charset));
return clientService.toTransitUri(tableName, rowKey);
}
private FlowFile doDeleteFromContent(FlowFile flowFile, ProcessContext context, ProcessSession session, String tableName, int batchSize, String charset) throws Exception {
String keySeparator = context.getProperty(KEY_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
final String restartIndex = flowFile.getAttribute(RESTART_INDEX);
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(flowFile, out);
out.close();
String data = new String(out.toByteArray(), charset);
int restartFrom = -1;
if (restartIndex != null) {
restartFrom = Integer.parseInt(restartIndex);
}
String first = null, last = null;
List<byte[]> batch = new ArrayList<>();
if (data != null && data.length() > 0) {
String[] parts = data.split(keySeparator);
int index = 0;
try {
for (index = 0; index < parts.length; index++) {
if (restartFrom > 0 && index < restartFrom) {
continue;
}
if (first == null) {
first = parts[index];
}
batch.add(parts[index].getBytes(charset));
if (batch.size() == batchSize) {
clientService.delete(tableName, batch);
batch = new ArrayList<>();
}
last = parts[index];
}
if (batch.size() > 0) {
clientService.delete(tableName, batch);
}
flowFile = session.removeAttribute(flowFile, RESTART_INDEX);
flowFile = session.putAttribute(flowFile, ROWKEY_START, first);
flowFile = session.putAttribute(flowFile, ROWKEY_END, last);
} catch (Exception ex) {
getLogger().error("Error sending delete batch", ex);
int restartPoint = index - batch.size() > 0 ? index - batch.size() : 0;
flowFile = session.putAttribute(flowFile, RESTART_INDEX, String.valueOf(restartPoint));
}
}
return flowFile;
}
}

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.hbase.DeleteHBaseRow
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell
org.apache.nifi.hbase.PutHBaseJSON

View File

@ -24,7 +24,6 @@ import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -39,6 +38,7 @@ public class MockHBaseClientService extends AbstractControllerService implements
private Map<String,ResultCell[]> results = new HashMap<>();
private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
private boolean throwException = false;
private boolean throwExceptionDuringBatchDelete = false;
private int numScans = 0;
private int numPuts = 0;
@Override
@ -71,6 +71,40 @@ public class MockHBaseClientService extends AbstractControllerService implements
throw new UnsupportedOperationException();
}
private int deletePoint = 0;
public void setDeletePoint(int deletePoint) {
this.deletePoint = deletePoint;
}
@Override
public void delete(String tableName, List<byte[]> rowIds) throws IOException {
if (throwException) {
throw new RuntimeException("Simulated connectivity error");
}
int index = 0;
for (byte[] id : rowIds) {
String key = new String(id);
Object val = results.remove(key);
if (index == deletePoint && throwExceptionDuringBatchDelete) {
throw new RuntimeException("Forcing write of restart.index");
}
if (val == null && deletePoint >= 0) {
throw new RuntimeException(String.format("%s was never added.", key));
}
index++;
}
}
public int size() {
return results.size();
}
public boolean isEmpty() {
return results.isEmpty();
}
@Override
public void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, ResultHandler handler) throws IOException {
if (throwException) {
@ -216,4 +250,12 @@ public class MockHBaseClientService extends AbstractControllerService implements
public void setFailureThreshold(int failureThreshold) {
this.failureThreshold = failureThreshold;
}
public boolean isThrowExceptionDuringBatchDelete() {
return throwExceptionDuringBatchDelete;
}
public void setThrowExceptionDuringBatchDelete(boolean throwExceptionDuringBatchDelete) {
this.throwExceptionDuringBatchDelete = throwExceptionDuringBatchDelete;
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class TestDeleteHBaseRow {
private TestRunner runner;
private MockHBaseClientService hBaseClient;
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(new DeleteHBaseRow());
hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
}
List<String> populateTable(int max) {
List<String> ids = new ArrayList<>();
for (int index = 0; index < max; index++) {
String uuid = UUID.randomUUID().toString();
ids.add(uuid);
Map<String, String> cells = new HashMap<>();
cells.put("test", UUID.randomUUID().toString());
hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
}
return ids;
}
@Test
public void testSimpleDelete() {
List<String> ids = populateTable(100);
runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100");
runner.setProperty(DeleteHBaseRow.FLOWFILE_FETCH_COUNT, "100");
for (String id : ids) {
runner.enqueue(id);
}
runner.run(1, true);
Assert.assertTrue("The mock client was not empty.", hBaseClient.isEmpty());
}
private String buildSeparatedString(List<String> ids, String separator) {
StringBuilder sb = new StringBuilder();
for (int index = 1; index <= ids.size(); index++) {
sb.append(ids.get(index - 1)).append(separator);
}
return sb.toString();
}
private void testSeparatedDeletes(String separator) {
testSeparatedDeletes(separator, separator, new HashMap());
}
private void testSeparatedDeletes(String separator, String separatorProp, Map attrs) {
List<String> ids = populateTable(10000);
runner.setProperty(DeleteHBaseRow.KEY_SEPARATOR, separator);
runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100");
runner.enqueue(buildSeparatedString(ids, separatorProp), attrs);
runner.run(1, true);
Assert.assertTrue("The mock client was not empty.", hBaseClient.isEmpty());
}
@Test
public void testDeletesSeparatedByNewLines() {
testSeparatedDeletes("\n");
}
@Test
public void testDeletesSeparatedByCommas() {
testSeparatedDeletes(",");
}
@Test
public void testDeleteWithELSeparator() {
runner.setValidateExpressionUsage(true);
Map<String, String> attrs = new HashMap<>();
attrs.put("test.separator", "____");
testSeparatedDeletes("${test.separator}", "____", attrs);
}
@Test
public void testDeleteWithExpressionLanguage() {
List<String> ids = populateTable(1000);
for (String id : ids) {
String[] parts = id.split("-");
Map<String, String> attrs = new HashMap<>();
for (int index = 0; index < parts.length; index++) {
attrs.put(String.format("part_%d", index), parts[index]);
}
runner.enqueue(id, attrs);
}
runner.setProperty(DeleteHBaseRow.ROW_ID, "${part_0}-${part_1}-${part_2}-${part_3}-${part_4}");
runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION, DeleteHBaseRow.ROW_ID_ATTR);
runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "200");
runner.setValidateExpressionUsage(true);
runner.run(1, true);
}
@Test
public void testConnectivityErrorHandling() {
List<String> ids = populateTable(100);
for (String id : ids) {
runner.enqueue(id);
}
boolean exception = false;
try {
hBaseClient.setThrowException(true);
runner.run(1, true);
} catch (Exception ex) {
exception = true;
} finally {
hBaseClient.setThrowException(false);
}
Assert.assertFalse("An unhandled exception was caught.", exception);
}
@Test
public void testRestartIndexAttribute() {
List<String> ids = populateTable(500);
StringBuilder sb = new StringBuilder();
for (int index = 0; index < ids.size(); index++) {
sb.append(ids.get(index)).append( index < ids.size() - 1 ? "," : "");
}
runner.enqueue(sb.toString());
runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION, DeleteHBaseRow.ROW_ID_CONTENT);
Assert.assertTrue("There should have been 500 rows.", hBaseClient.size() == 500);
hBaseClient.setDeletePoint(20);
hBaseClient.setThrowExceptionDuringBatchDelete(true);
runner.run(1, true, true);
runner.assertTransferCount(DeleteHBaseRow.REL_FAILURE, 1);
runner.assertTransferCount(DeleteHBaseRow.REL_SUCCESS, 0);
Assert.assertTrue("Partially deleted", hBaseClient.size() < 500);
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(DeleteHBaseRow.REL_FAILURE);
Assert.assertNotNull("Missing restart.index attribute", flowFile.get(0).getAttribute("restart.index"));
byte[] oldData = runner.getContentAsByteArray(flowFile.get(0));
Map<String, String> attrs = new HashMap<>();
attrs.put("restart.index", flowFile.get(0).getAttribute("restart.index"));
runner.enqueue(oldData, attrs);
hBaseClient.setDeletePoint(-1);
hBaseClient.setThrowExceptionDuringBatchDelete(false);
runner.clearTransferState();
runner.run(1, true, true);
runner.assertTransferCount(DeleteHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(DeleteHBaseRow.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(DeleteHBaseRow.REL_SUCCESS);
Assert.assertTrue("The client should have been empty", hBaseClient.isEmpty());
Assert.assertNull("The restart.index attribute should be null", flowFile.get(0).getAttribute("restart.index"));
}
}

View File

@ -29,6 +29,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@Tags({"hbase", "client"})
@CapabilityDescription("A controller service for accessing an HBase client.")
@ -116,6 +117,15 @@ public interface HBaseClientService extends ControllerService {
*/
void delete(String tableName, byte[] rowId) throws IOException;
/**
* Deletes a list of rows in HBase. All cells are deleted.
*
* @param tableName the name of an HBase table
* @param rowIds a list of rowIds to send in a batch delete
*/
void delete(String tableName, List<byte[]> rowIds) throws IOException;
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*

View File

@ -355,6 +355,17 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
}
@Override
public void delete(String tableName, List<byte[]> rowIds) throws IOException {
List<Delete> deletes = new ArrayList<>();
for (int index = 0; index < rowIds.size(); index++) {
deletes.add(new Delete(rowIds.get(index)));
}
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
table.delete(deletes);
}
}
@Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {