mirror of https://github.com/apache/nifi.git
NIFI-6656 Added a default visibility expression configuration item to… (#3723)
* NIFI-6656 Added a default visibility expression configuration item to the HBase map cache client services. * NIFI-6656 Fixed validation bug. * NIFI-6656 Added stylecheck fix. * NIFI-6656 Deleted shared interface and moved property descriptor into both services. * NIFI-6656 Removed dependency from hbase api.
This commit is contained in:
parent
9c9f9c10a9
commit
02d3b7e92b
|
@ -32,6 +32,10 @@ public class PutColumn {
|
|||
this(columnFamily, columnQualifier, buffer, null, null);
|
||||
}
|
||||
|
||||
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final String visibility) {
|
||||
this(columnFamily, columnQualifier, buffer, null, visibility);
|
||||
}
|
||||
|
||||
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp) {
|
||||
this(columnFamily, columnQualifier, buffer, timestamp, null);
|
||||
}
|
||||
|
@ -41,7 +45,7 @@ public class PutColumn {
|
|||
this.columnQualifier = columnQualifier;
|
||||
this.buffer = buffer;
|
||||
this.timestamp = timestamp;
|
||||
this.visibility = visibility;
|
||||
this.visibility = (visibility != null && visibility.trim().length() > 0) ? visibility : null;
|
||||
}
|
||||
|
||||
public byte[] getColumnFamily() {
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
|||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
|
||||
|
@ -84,12 +85,22 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor VISIBILITY_EXPRESSION = new PropertyDescriptor.Builder()
|
||||
.name("hbase-cache-visibility-expression")
|
||||
.displayName("Visibility Expression")
|
||||
.description("The default visibility expression to apply to cells when visibility expression support is enabled.")
|
||||
.defaultValue("")
|
||||
.addValidator(Validator.VALID)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(HBASE_CACHE_TABLE_NAME);
|
||||
descriptors.add(AUTHORIZATIONS);
|
||||
descriptors.add(VISIBILITY_EXPRESSION);
|
||||
descriptors.add(HBASE_CLIENT_SERVICE);
|
||||
descriptors.add(HBASE_COLUMN_FAMILY);
|
||||
descriptors.add(HBASE_COLUMN_QUALIFIER);
|
||||
|
@ -107,6 +118,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
|
|||
private volatile byte[] hBaseColumnQualifierBytes;
|
||||
|
||||
private List<String> authorizations;
|
||||
private String defaultVisibilityExpression;
|
||||
|
||||
@OnEnabled
|
||||
public void onConfigured(final ConfigurationContext context) throws InitializationException{
|
||||
|
@ -120,6 +132,11 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
|
|||
hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
authorizations = VisibilityLabelUtils.getAuthorizations(context);
|
||||
if (context.getProperty(VISIBILITY_EXPRESSION).isSet()) {
|
||||
defaultVisibilityExpression = context.getProperty(VISIBILITY_EXPRESSION).evaluateAttributeExpressions().getValue();
|
||||
} else {
|
||||
defaultVisibilityExpression = null;
|
||||
}
|
||||
}
|
||||
|
||||
private <T> byte[] serialize(final T value, final Serializer<T> serializer) throws IOException {
|
||||
|
@ -137,7 +154,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
|
|||
|
||||
final byte[] rowIdBytes = serialize(key, keySerializer);
|
||||
final byte[] valueBytes = serialize(value, valueSerializer);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
|
||||
|
||||
return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null, putColumn);
|
||||
}
|
||||
|
@ -149,7 +166,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
|
|||
final byte[] rowIdBytes = serialize(key, keySerializer);
|
||||
final byte[] valueBytes = serialize(value, valueSerializer);
|
||||
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
|
||||
putColumns.add(putColumn);
|
||||
|
||||
hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns);
|
||||
|
@ -254,7 +271,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
|
|||
final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
|
||||
final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
|
||||
final byte[] revision = entry.getRevision().orElse(null);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
|
||||
|
||||
// If the current revision is unset then only insert the row if it doesn't already exist.
|
||||
return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, revision, putColumn);
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
|||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
|
||||
|
@ -85,11 +86,22 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
|
|||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor VISIBILITY_EXPRESSION = new PropertyDescriptor.Builder()
|
||||
.name("hbase-cache-visibility-expression")
|
||||
.displayName("Visibility Expression")
|
||||
.description("The default visibility expression to apply to cells when visibility expression support is enabled.")
|
||||
.defaultValue("")
|
||||
.addValidator(Validator.VALID)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(HBASE_CACHE_TABLE_NAME);
|
||||
descriptors.add(AUTHORIZATIONS);
|
||||
descriptors.add(VISIBILITY_EXPRESSION);
|
||||
descriptors.add(HBASE_CLIENT_SERVICE);
|
||||
descriptors.add(HBASE_COLUMN_FAMILY);
|
||||
descriptors.add(HBASE_COLUMN_QUALIFIER);
|
||||
|
@ -107,6 +119,7 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
|
|||
private volatile byte[] hBaseColumnQualifierBytes;
|
||||
|
||||
private List<String> authorizations;
|
||||
private String defaultVisibilityExpression;
|
||||
|
||||
@OnEnabled
|
||||
public void onConfigured(final ConfigurationContext context) throws InitializationException{
|
||||
|
@ -120,6 +133,11 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
|
|||
hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
authorizations = VisibilityLabelUtils.getAuthorizations(context);
|
||||
if (context.getProperty(VISIBILITY_EXPRESSION).isSet()) {
|
||||
defaultVisibilityExpression = context.getProperty(VISIBILITY_EXPRESSION).evaluateAttributeExpressions().getValue();
|
||||
} else {
|
||||
defaultVisibilityExpression = null;
|
||||
}
|
||||
}
|
||||
|
||||
private <T> byte[] serialize(final T value, final Serializer<T> serializer) throws IOException {
|
||||
|
@ -137,7 +155,7 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
|
|||
|
||||
final byte[] rowIdBytes = serialize(key, keySerializer);
|
||||
final byte[] valueBytes = serialize(value, valueSerializer);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
|
||||
|
||||
return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null, putColumn);
|
||||
}
|
||||
|
@ -149,7 +167,7 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
|
|||
final byte[] rowIdBytes = serialize(key, keySerializer);
|
||||
final byte[] valueBytes = serialize(value, valueSerializer);
|
||||
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
|
||||
putColumns.add(putColumn);
|
||||
|
||||
hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns);
|
||||
|
@ -254,7 +272,7 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
|
|||
final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
|
||||
final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
|
||||
final byte[] revision = entry.getRevision().orElse(null);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
|
||||
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
|
||||
|
||||
// If the current revision is unset then only insert the row if it doesn't already exist.
|
||||
return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, revision, putColumn);
|
||||
|
|
Loading…
Reference in New Issue