NIFI-6432 Updating get, containsKey, and remove in HBase_1_1_2_ClientMapCacheService and HBase_2_ClientMapCacheService so that they use the column family and qualifier specified in the service

This closes #3581.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Bryan Bende 2019-07-11 14:31:13 -04:00 committed by Koji Kawamura
parent e277545cea
commit f2db1539a8
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
2 changed files with 32 additions and 32 deletions

View File

@ -16,12 +16,6 @@
*/
package org.apache.nifi.hbase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -29,22 +23,25 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.reporting.InitializationException;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@ -164,6 +161,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
return (handler.numRows() > 0);
@ -196,6 +194,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
if (handler.numRows() > 1) {
@ -212,7 +211,8 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
final boolean contains = containsKey(key, keySerializer);
if (contains) {
final byte[] rowIdBytes = serialize(key, keySerializer);
hBaseClientService.delete(hBaseCacheTableName, rowIdBytes);
final DeleteRequest deleteRequest = new DeleteRequest(rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null);
hBaseClientService.deleteCells(hBaseCacheTableName, Collections.singletonList(deleteRequest));
}
return contains;
}

View File

@ -16,12 +16,6 @@
*/
package org.apache.nifi.hbase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -29,22 +23,25 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.reporting.InitializationException;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@ -164,6 +161,7 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
return (handler.numRows() > 0);
@ -196,6 +194,7 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
if (handler.numRows() > 1) {
@ -212,7 +211,8 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
final boolean contains = containsKey(key, keySerializer);
if (contains) {
final byte[] rowIdBytes = serialize(key, keySerializer);
hBaseClientService.delete(hBaseCacheTableName, rowIdBytes);
final DeleteRequest deleteRequest = new DeleteRequest(rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null);
hBaseClientService.deleteCells(hBaseCacheTableName, Collections.singletonList(deleteRequest));
}
return contains;
}