From 143d7e6829c7ac5967dceb0df4a9f7f4456720eb Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Tue, 13 Feb 2018 13:12:00 -0500 Subject: [PATCH] NIFI-3538 Added DeleteHBaseRow This closes #2294. Signed-off-by: Koji Kawamura --- .../nifi/hbase/AbstractDeleteHBase.java | 103 +++++++++ .../org/apache/nifi/hbase/DeleteHBaseRow.java | 215 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../nifi/hbase/MockHBaseClientService.java | 44 +++- .../apache/nifi/hbase/TestDeleteHBaseRow.java | 197 ++++++++++++++++ .../apache/nifi/hbase/HBaseClientService.java | 10 + .../nifi/hbase/HBase_1_1_2_ClientService.java | 11 + 7 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java create mode 100644 nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java create mode 100644 nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java new file mode 100644 index 0000000000..a097fbefb6 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractDeleteHBase extends AbstractProcessor { + protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Client Service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the HBase Table.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder() + .name("Row Identifier") + .description("Specifies the Row ID to use when deleting data into HBase") + .required(false) // not all sub-classes will require this + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be sent to HBase") + .build(); + + protected HBaseClientService clientService; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + } + + @Override + public Set getRelationships() { + Set set = new HashSet<>(); + set.add(REL_SUCCESS); + set.add(REL_FAILURE); + + return set; + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + try { + doDelete(context, session); + } catch (Exception e) { + getLogger().error("Error", e); + } finally { + session.commit(); + } + } + + protected abstract void doDelete(ProcessContext context, ProcessSession session) throws Exception; +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java new file mode 100644 index 0000000000..8aec55a450 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; + +@WritesAttributes( + value = { + @WritesAttribute( attribute = "restart.index", description = "If a delete batch fails, 'restart.index' attribute is added to the FlowFile and sent to 'failure' " + + "relationship, so that this processor can retry from there when the same FlowFile is routed again." ), + @WritesAttribute( attribute = "rowkey.start", description = "The first rowkey in the flowfile. Only written when using the flowfile's content for the row IDs."), + @WritesAttribute( attribute = "rowkey.end", description = "The last rowkey in the flowfile. Only written when using the flowfile's content for the row IDs.") + } +) +@Tags({ "delete", "hbase" }) +@CapabilityDescription( + "Delete HBase records individually or in batches. The input can be a single row ID in the flowfile content, one ID per line, " + + "row IDs separated by a configurable separator character (default is a comma). ") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class DeleteHBaseRow extends AbstractDeleteHBase { + static final AllowableValue ROW_ID_CONTENT = new AllowableValue("content", "FlowFile content", "Get the row key(s) from the flowfile content."); + static final AllowableValue ROW_ID_ATTR = new AllowableValue("attr", "FlowFile attributes", "Get the row key from an expression language statement."); + + static final String RESTART_INDEX = "restart.index"; + static final String ROWKEY_START = "rowkey.start"; + static final String ROWKEY_END = "rowkey.end"; + + static final PropertyDescriptor ROW_ID_LOCATION = new PropertyDescriptor.Builder() + .name("delete-hb-row-id-location") + .displayName("Row ID Location") + .description("The location of the row ID to use for building the delete. Can be from the content or an expression language statement.") + .required(true) + .defaultValue(ROW_ID_CONTENT.getValue()) + .allowableValues(ROW_ID_CONTENT, ROW_ID_ATTR) + .addValidator(Validator.VALID) + .build(); + + static final PropertyDescriptor FLOWFILE_FETCH_COUNT = new PropertyDescriptor.Builder() + .name("delete-hb-flowfile-fetch-count") + .displayName("Flowfile Fetch Count") + .description("The number of flowfiles to fetch per run.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5") + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("delete-hb-batch-size") + .displayName("Batch Size") + .description("The number of deletes to send per batch.") + .required(true) + .defaultValue("50") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor KEY_SEPARATOR = new PropertyDescriptor.Builder() + .name("delete-hb-separator") + .displayName("Delete Row Key Separator") + .description("The separator character(s) that separate multiple row keys " + + "when multiple row keys are provided in the flowfile content") + .required(true) + .defaultValue(",") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("delete-char-set") + .displayName("Character Set") + .description("The character set used to encode the row key for HBase.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = super.getSupportedPropertyDescriptors(); + properties.add(ROW_ID_LOCATION); + properties.add(FLOWFILE_FETCH_COUNT); + properties.add(BATCH_SIZE); + properties.add(KEY_SEPARATOR); + properties.add(CHARSET); + + return properties; + } + + @Override + protected void doDelete(ProcessContext context, ProcessSession session) throws Exception { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final String location = context.getProperty(ROW_ID_LOCATION).getValue(); + final int flowFileCount = context.getProperty(FLOWFILE_FETCH_COUNT).asInteger(); + final String charset = context.getProperty(CHARSET).getValue(); + List flowFiles = session.get(flowFileCount); + + if (flowFiles != null && flowFiles.size() > 0) { + for (int index = 0; index < flowFiles.size(); index++) { + FlowFile flowFile = flowFiles.get(index); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + try { + if (location.equals(ROW_ID_CONTENT.getValue())) { + flowFile = doDeleteFromContent(flowFile, context, session, tableName, batchSize, charset); + if (flowFile.getAttribute(RESTART_INDEX) != null) { + session.transfer(flowFile, REL_FAILURE); + } else { + final String transitUrl = clientService.toTransitUri(tableName, flowFile.getAttribute(ROWKEY_END)); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl); + } + } else { + String transitUrl = doDeleteFromAttribute(flowFile, context, tableName, charset); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl); + } + } catch (Exception ex) { + getLogger().error(ex.getMessage(), ex); + session.transfer(flowFile, REL_FAILURE); + } + } + } + } + + private String doDeleteFromAttribute(FlowFile flowFile, ProcessContext context, String tableName, String charset) throws Exception { + String rowKey = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue(); + clientService.delete(tableName, rowKey.getBytes(charset)); + + return clientService.toTransitUri(tableName, rowKey); + } + + private FlowFile doDeleteFromContent(FlowFile flowFile, ProcessContext context, ProcessSession session, String tableName, int batchSize, String charset) throws Exception { + String keySeparator = context.getProperty(KEY_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); + final String restartIndex = flowFile.getAttribute(RESTART_INDEX); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(flowFile, out); + out.close(); + + String data = new String(out.toByteArray(), charset); + + int restartFrom = -1; + if (restartIndex != null) { + restartFrom = Integer.parseInt(restartIndex); + } + + String first = null, last = null; + + List batch = new ArrayList<>(); + if (data != null && data.length() > 0) { + String[] parts = data.split(keySeparator); + int index = 0; + try { + for (index = 0; index < parts.length; index++) { + if (restartFrom > 0 && index < restartFrom) { + continue; + } + + if (first == null) { + first = parts[index]; + } + + batch.add(parts[index].getBytes(charset)); + if (batch.size() == batchSize) { + clientService.delete(tableName, batch); + batch = new ArrayList<>(); + } + last = parts[index]; + } + if (batch.size() > 0) { + clientService.delete(tableName, batch); + } + + flowFile = session.removeAttribute(flowFile, RESTART_INDEX); + flowFile = session.putAttribute(flowFile, ROWKEY_START, first); + flowFile = session.putAttribute(flowFile, ROWKEY_END, last); + } catch (Exception ex) { + getLogger().error("Error sending delete batch", ex); + int restartPoint = index - batch.size() > 0 ? index - batch.size() : 0; + flowFile = session.putAttribute(flowFile, RESTART_INDEX, String.valueOf(restartPoint)); + } + } + + return flowFile; + } +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 21c827cc7a..b2cccc8930 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +org.apache.nifi.hbase.DeleteHBaseRow org.apache.nifi.hbase.GetHBase org.apache.nifi.hbase.PutHBaseCell org.apache.nifi.hbase.PutHBaseJSON diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index f62102a8e1..d720344293 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -24,7 +24,6 @@ import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -39,6 +38,7 @@ public class MockHBaseClientService extends AbstractControllerService implements private Map results = new HashMap<>(); private Map> flowFilePuts = new HashMap<>(); private boolean throwException = false; + private boolean throwExceptionDuringBatchDelete = false; private int numScans = 0; private int numPuts = 0; @Override @@ -71,6 +71,40 @@ public class MockHBaseClientService extends AbstractControllerService implements throw new UnsupportedOperationException(); } + private int deletePoint = 0; + public void setDeletePoint(int deletePoint) { + this.deletePoint = deletePoint; + } + + @Override + public void delete(String tableName, List rowIds) throws IOException { + if (throwException) { + throw new RuntimeException("Simulated connectivity error"); + } + + int index = 0; + for (byte[] id : rowIds) { + String key = new String(id); + Object val = results.remove(key); + if (index == deletePoint && throwExceptionDuringBatchDelete) { + throw new RuntimeException("Forcing write of restart.index"); + } + if (val == null && deletePoint >= 0) { + throw new RuntimeException(String.format("%s was never added.", key)); + } + + index++; + } + } + + public int size() { + return results.size(); + } + + public boolean isEmpty() { + return results.isEmpty(); + } + @Override public void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException { if (throwException) { @@ -216,4 +250,12 @@ public class MockHBaseClientService extends AbstractControllerService implements public void setFailureThreshold(int failureThreshold) { this.failureThreshold = failureThreshold; } + + public boolean isThrowExceptionDuringBatchDelete() { + return throwExceptionDuringBatchDelete; + } + + public void setThrowExceptionDuringBatchDelete(boolean throwExceptionDuringBatchDelete) { + this.throwExceptionDuringBatchDelete = throwExceptionDuringBatchDelete; + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java new file mode 100644 index 0000000000..6c0d92bb88 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class TestDeleteHBaseRow { + private TestRunner runner; + private MockHBaseClientService hBaseClient; + + @Before + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(new DeleteHBaseRow()); + + hBaseClient = new MockHBaseClientService(); + runner.addControllerService("hbaseClient", hBaseClient); + runner.enableControllerService(hBaseClient); + + runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi"); + runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient"); + } + + List populateTable(int max) { + List ids = new ArrayList<>(); + for (int index = 0; index < max; index++) { + String uuid = UUID.randomUUID().toString(); + ids.add(uuid); + Map cells = new HashMap<>(); + cells.put("test", UUID.randomUUID().toString()); + hBaseClient.addResult(uuid, cells, System.currentTimeMillis()); + } + + return ids; + } + + @Test + public void testSimpleDelete() { + List ids = populateTable(100); + + runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100"); + runner.setProperty(DeleteHBaseRow.FLOWFILE_FETCH_COUNT, "100"); + for (String id : ids) { + runner.enqueue(id); + } + + runner.run(1, true); + Assert.assertTrue("The mock client was not empty.", hBaseClient.isEmpty()); + } + + private String buildSeparatedString(List ids, String separator) { + StringBuilder sb = new StringBuilder(); + for (int index = 1; index <= ids.size(); index++) { + sb.append(ids.get(index - 1)).append(separator); + } + + return sb.toString(); + } + + private void testSeparatedDeletes(String separator) { + testSeparatedDeletes(separator, separator, new HashMap()); + } + + private void testSeparatedDeletes(String separator, String separatorProp, Map attrs) { + List ids = populateTable(10000); + runner.setProperty(DeleteHBaseRow.KEY_SEPARATOR, separator); + runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100"); + runner.enqueue(buildSeparatedString(ids, separatorProp), attrs); + runner.run(1, true); + + Assert.assertTrue("The mock client was not empty.", hBaseClient.isEmpty()); + } + + @Test + public void testDeletesSeparatedByNewLines() { + testSeparatedDeletes("\n"); + } + + @Test + public void testDeletesSeparatedByCommas() { + testSeparatedDeletes(","); + } + + @Test + public void testDeleteWithELSeparator() { + runner.setValidateExpressionUsage(true); + Map attrs = new HashMap<>(); + attrs.put("test.separator", "____"); + testSeparatedDeletes("${test.separator}", "____", attrs); + } + + @Test + public void testDeleteWithExpressionLanguage() { + List ids = populateTable(1000); + for (String id : ids) { + String[] parts = id.split("-"); + Map attrs = new HashMap<>(); + for (int index = 0; index < parts.length; index++) { + attrs.put(String.format("part_%d", index), parts[index]); + } + runner.enqueue(id, attrs); + } + runner.setProperty(DeleteHBaseRow.ROW_ID, "${part_0}-${part_1}-${part_2}-${part_3}-${part_4}"); + runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION, DeleteHBaseRow.ROW_ID_ATTR); + runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "200"); + runner.setValidateExpressionUsage(true); + runner.run(1, true); + } + + @Test + public void testConnectivityErrorHandling() { + List ids = populateTable(100); + for (String id : ids) { + runner.enqueue(id); + } + boolean exception = false; + try { + hBaseClient.setThrowException(true); + runner.run(1, true); + } catch (Exception ex) { + exception = true; + } finally { + hBaseClient.setThrowException(false); + } + + Assert.assertFalse("An unhandled exception was caught.", exception); + } + + @Test + public void testRestartIndexAttribute() { + List ids = populateTable(500); + StringBuilder sb = new StringBuilder(); + for (int index = 0; index < ids.size(); index++) { + sb.append(ids.get(index)).append( index < ids.size() - 1 ? "," : ""); + } + runner.enqueue(sb.toString()); + runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION, DeleteHBaseRow.ROW_ID_CONTENT); + + Assert.assertTrue("There should have been 500 rows.", hBaseClient.size() == 500); + + hBaseClient.setDeletePoint(20); + hBaseClient.setThrowExceptionDuringBatchDelete(true); + runner.run(1, true, true); + + runner.assertTransferCount(DeleteHBaseRow.REL_FAILURE, 1); + runner.assertTransferCount(DeleteHBaseRow.REL_SUCCESS, 0); + + Assert.assertTrue("Partially deleted", hBaseClient.size() < 500); + + List flowFile = runner.getFlowFilesForRelationship(DeleteHBaseRow.REL_FAILURE); + Assert.assertNotNull("Missing restart.index attribute", flowFile.get(0).getAttribute("restart.index")); + + byte[] oldData = runner.getContentAsByteArray(flowFile.get(0)); + Map attrs = new HashMap<>(); + attrs.put("restart.index", flowFile.get(0).getAttribute("restart.index")); + runner.enqueue(oldData, attrs); + hBaseClient.setDeletePoint(-1); + hBaseClient.setThrowExceptionDuringBatchDelete(false); + runner.clearTransferState(); + runner.run(1, true, true); + + runner.assertTransferCount(DeleteHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(DeleteHBaseRow.REL_SUCCESS, 1); + + flowFile = runner.getFlowFilesForRelationship(DeleteHBaseRow.REL_SUCCESS); + + Assert.assertTrue("The client should have been empty", hBaseClient.isEmpty()); + Assert.assertNull("The restart.index attribute should be null", flowFile.get(0).getAttribute("restart.index")); + + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 0c2a131fd0..10d17ab281 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -29,6 +29,7 @@ import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.util.Collection; +import java.util.List; @Tags({"hbase", "client"}) @CapabilityDescription("A controller service for accessing an HBase client.") @@ -116,6 +117,15 @@ public interface HBaseClientService extends ControllerService { */ void delete(String tableName, byte[] rowId) throws IOException; + /** + * Deletes a list of rows in HBase. All cells are deleted. + * + * @param tableName the name of an HBase table + * @param rowIds a list of rowIds to send in a batch delete + */ + + void delete(String tableName, List rowIds) throws IOException; + /** * Scans the given table using the optional filter criteria and passing each result to the provided handler. * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index cc6927bc36..07a3cf2bd8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -355,6 +355,17 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } + @Override + public void delete(String tableName, List rowIds) throws IOException { + List deletes = new ArrayList<>(); + for (int index = 0; index < rowIds.size(); index++) { + deletes.add(new Delete(rowIds.get(index))); + } + try (final Table table = connection.getTable(TableName.valueOf(tableName))) { + table.delete(deletes); + } + } + @Override public void scan(final String tableName, final Collection columns, final String filterExpression, final long minTime, final ResultHandler handler) throws IOException {