NIFI-11149 Added PutRedisHashRecord Processor

This closes #6943

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2023-02-10 16:27:02 -05:00 committed by exceptionfactory
parent b56b1da842
commit 9c5ae0d098
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
7 changed files with 559 additions and 19 deletions

View File

@ -68,6 +68,20 @@
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
@ -75,6 +89,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>

View File

@ -0,0 +1,281 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.processor;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.springframework.data.redis.connection.RedisConnection;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "redis", "hash", "record"})
@CapabilityDescription("Puts record field data into Redis using a specified hash value, which is determined by a RecordPath to a field in each record containing the hash value. The record fields "
+ "and values are stored as key/value pairs associated by the hash value. NOTE: Neither the evaluated hash value nor any of the field values can be null. If the hash value is null, "
+ "the FlowFile will be routed to failure. For each of the field values, if the value is null that field will be not set in Redis.")
@WritesAttributes({
@WritesAttribute(
attribute = PutRedisHashRecord.SUCCESS_RECORD_COUNT,
description = "Number of records written to Redis")
})
public class PutRedisHashRecord extends AbstractProcessor {
public static final String SUCCESS_RECORD_COUNT = "redis.success.record.count";
protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new PropertyDescriptor.Builder()
.name("hash-value-record-path")
.displayName("Hash Value Record Path")
.description("Specifies a RecordPath to evaluate against each Record in order to determine the hash value associated with all the record fields/values "
+ "(see 'hset' in Redis documentation for more details). The RecordPath must point at exactly one field or an error will occur.")
.required(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.build();
static final PropertyDescriptor DATA_RECORD_PATH = new PropertyDescriptor.Builder()
.name("data-record-path")
.displayName("Data Record Path")
.description("This property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
" Redis instead of sending the entire incoming Record. The property defaults to the root '/' which corresponds to a 'flat' record (all fields/values at the top level of " +
" the Record.")
.required(true)
.addValidator(new RecordPathValidator())
.defaultValue("/")
.expressionLanguageSupported(NONE)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("charset")
.displayName("Character Set")
.description("Specifies the character set to use when storing record field values as strings. All fields will be converted to strings using this character set "
+ "before being stored in Redis.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles having all Records stored in Redis will be routed to this relationship")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles containing Records with processing errors will be routed to this relationship")
.build();
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new LinkedHashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(RECORD_READER_FACTORY);
props.add(REDIS_CONNECTION_POOL);
props.add(HASH_VALUE_RECORD_PATH);
props.add(DATA_RECORD_PATH);
props.add(CHARSET);
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
private volatile RedisConnectionPool redisConnectionPool;
@OnScheduled
public void onScheduled(final ProcessContext context) {
this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
}
@OnStopped
public void onStopped() {
this.redisConnectionPool = null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
long count = 0;
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
RedisConnection redisConnection = redisConnectionPool.getConnection()) {
final String hashValueRecordPathValue = context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
final RecordPath hashValueRecordPath = RecordPath.compile(hashValueRecordPathValue);
final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
final RecordPath dataRecordPath = RecordPath.compile(dataRecordPathValue);
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
Record record;
while ((record = reader.nextRecord()) != null) {
final RecordPathResult recordPathResult = hashValueRecordPath.evaluate(record);
final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
if (resultList.isEmpty()) {
throw new ProcessException(String.format("No results found for Record [%d] Hash Value Record Path: %s", count, hashValueRecordPath.getPath()));
}
if (resultList.size() > 1) {
throw new ProcessException(String.format("Multiple results [%d] found for Record [%d] Hash Value Record Path: %s", resultList.size(), count, hashValueRecordPath.getPath()));
}
final FieldValue hashValueFieldValue = resultList.get(0);
final Object hashValueObject = hashValueFieldValue.getValue();
if (hashValueObject == null) {
throw new ProcessException(String.format("Null value found for Record [%d] Hash Value Record Path: %s", count, hashValueRecordPath.getPath()));
}
final String hashValue = (String) DataTypeUtils.convertType(hashValueObject, RecordFieldType.STRING.getDataType(), charset.name());
List<Record> dataRecords = getDataRecords(dataRecordPath, record);
count = putDataRecordsToRedis(dataRecords, redisConnection, hashValue, charset, count);
}
} catch (MalformedRecordException e) {
getLogger().error("Read Records failed {}", flowFile, e);
flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
session.transfer(flowFile, REL_FAILURE);
return;
} catch (SchemaNotFoundException e) {
getLogger().error("Record Schema not found {}", flowFile, e);
flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
session.transfer(flowFile, REL_FAILURE);
return;
} catch (Exception e) {
getLogger().error("Put Records failed {}", flowFile, e);
flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(count));
session.transfer(flowFile, REL_SUCCESS);
}
private List<Record> getDataRecords(final RecordPath dataRecordPath, final Record outerRecord) {
if (dataRecordPath == null) {
return Collections.singletonList(outerRecord);
}
final RecordPathResult result = dataRecordPath.evaluate(outerRecord);
final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
if (fieldValues.isEmpty()) {
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
}
for (final FieldValue fieldValue : fieldValues) {
final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
if (fieldType != RecordFieldType.RECORD) {
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
" " + fieldType);
}
}
final List<Record> dataRecords = new ArrayList<>(fieldValues.size());
for (final FieldValue fieldValue : fieldValues) {
dataRecords.add((Record) fieldValue.getValue());
}
return dataRecords;
}
private long putDataRecordsToRedis(final List<Record> dataRecords, final RedisConnection redisConnection, final String hashValue, final Charset charset, final long originalCount) {
long count = originalCount;
for (Record dataRecord : dataRecords) {
RecordSchema dataRecordSchema = dataRecord.getSchema();
for (RecordField recordField : dataRecordSchema.getFields()) {
final String fieldName = recordField.getFieldName();
final Object value = dataRecord.getValue(fieldName);
if (fieldName == null || value == null) {
getLogger().debug("Record field missing required elements: name [{}] value [{}]", fieldName, value);
} else {
final String stringValue = (String) DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), charset.name());
redisConnection.hashCommands().hSet(hashValue.getBytes(charset), fieldName.getBytes(charset), stringValue.getBytes(charset));
}
}
count++;
}
return count;
}
}

View File

@ -29,7 +29,6 @@ import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
@ -51,6 +50,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.apache.nifi.redis.util.RedisUtils.TTL;
@Tags({ "redis", "distributed", "cache", "map" })
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
"the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " +
@ -58,22 +60,6 @@ import java.util.concurrent.TimeUnit;
"provide high-availability configurations.")
public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("redis-connection-pool")
.displayName("Redis Connection Pool")
.identifiesControllerService(RedisConnectionPool.class)
.required(true)
.build();
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
.name("redis-cache-ttl")
.displayName("TTL")
.description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true)
.defaultValue("0 secs")
.build();
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static {
final List<PropertyDescriptor> props = new ArrayList<>();

View File

@ -24,6 +24,7 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.util.StringUtils;
@ -46,6 +47,25 @@ import java.util.concurrent.TimeUnit;
public class RedisUtils {
// These properties are shared among the controller service(s) and processor(s) that use a RedisConnectionPool
public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("redis-connection-pool")
.displayName("Redis Connection Pool")
.identifiesControllerService(RedisConnectionPool.class)
.required(true)
.build();
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
.name("redis-cache-ttl")
.displayName("TTL")
.description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true)
.defaultValue("0 secs")
.build();
// These properties are shared between the connection pool controller service and the state provider, the name
// is purposely set to be more human-readable since that will be referenced in state-management.xml

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.redis.processor.PutRedisHashRecord

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.processor;
import org.apache.nifi.redis.service.RedisConnectionPoolService;
import org.apache.nifi.redis.util.RedisUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisHashCommands;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class TestPutRedisHashRecord {
private TestRunner runner;
private PutRedisHashRecord processor;
private MockRecordParser parser;
private MockRedisConnectionPoolService connectionPoolService;
@BeforeEach
public void setup() throws Exception {
processor = new PutRedisHashRecord();
runner = TestRunners.newTestRunner(processor);
parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
runner.setProperty(PutRedisHashRecord.RECORD_READER_FACTORY, "parser");
connectionPoolService = new MockRedisConnectionPoolService();
connectionPoolService.setFailAfterN(0);
try {
runner.addControllerService("connectionPool", connectionPoolService);
} catch (InitializationException e) {
throw new IOException(e);
}
runner.setProperty(connectionPoolService, RedisUtils.CONNECTION_STRING, "localhost:6379");
runner.setProperty(RedisUtils.REDIS_CONNECTION_POOL, "connectionPool");
// Tests should provide a field named 'hash' with unique values per record, unless testing failure conditions
runner.setProperty(PutRedisHashRecord.HASH_VALUE_RECORD_PATH, "/hash");
}
@Test
public void testPutRecords() {
runner.assertNotValid();
runner.enableControllerService(connectionPoolService);
parser.addSchemaField("hash", RecordFieldType.STRING);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("age", RecordFieldType.INT);
parser.addRecord("abc", "John Doe", 48);
parser.addRecord("def", "Jane Doe", 47);
parser.addRecord("ghi", "Jimmy Doe", 14);
runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_SUCCESS);
assertEquals(1, result.size());
MockFlowFile ff = result.get(0);
ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "3");
// Verify the content is untouched
ff.assertContentEquals("hello");
}
@Test
public void testPutRecordsFailAfterN() {
runner.assertNotValid();
connectionPoolService.setFailAfterN(5);
runner.enableControllerService(connectionPoolService);
parser.addSchemaField("hash", RecordFieldType.STRING);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("age", RecordFieldType.INT);
parser.addRecord("abc", "John Doe", 48);
parser.addRecord("def", "Jane Doe", 47);
parser.addRecord("ghi", "Jimmy Doe", 14);
runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_FAILURE, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_FAILURE);
assertEquals(1, result.size());
MockFlowFile ff = result.get(0);
ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "1");
// Verify the content is untouched
ff.assertContentEquals("hello");
}
@Test
public void testPutRecordsNoHash() {
runner.assertNotValid();
runner.setProperty(PutRedisHashRecord.HASH_VALUE_RECORD_PATH, "/invalid_path");
runner.assertValid(connectionPoolService);
runner.enableControllerService(connectionPoolService);
parser.addSchemaField("hash", RecordFieldType.STRING);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("age", RecordFieldType.INT);
parser.addRecord("abc", "John Doe", 48);
runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_FAILURE, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_FAILURE);
assertEquals(1, result.size());
MockFlowFile ff = result.get(0);
ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "0");
// Verify the content is untouched
ff.assertContentEquals("hello");
}
@Test
public void testPutRecordsNullValue() {
runner.assertNotValid();
runner.enableControllerService(connectionPoolService);
parser.addSchemaField("hash", RecordFieldType.STRING);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("age", RecordFieldType.INT);
parser.addRecord("abc", "John Doe", 48);
parser.addRecord("def", "Jane Doe", null);
runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
runner.run();
// FlowFile should be routed to success but with only one record
runner.assertAllFlowFilesTransferred(PutRedisHashRecord.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutRedisHashRecord.REL_SUCCESS);
assertEquals(1, result.size());
MockFlowFile ff = result.get(0);
// Both records are transferred successfully, but the null value was not put into Redis
ff.assertAttributeEquals(PutRedisHashRecord.SUCCESS_RECORD_COUNT, "2");
// Verify the content is untouched
ff.assertContentEquals("hello");
}
private static class MockRedisConnectionPoolService extends RedisConnectionPoolService {
private final Map<String, Map<String, String>> hashStore = new HashMap<>();
private int failAfterN = 0;
private int currentFailures = 0;
@Override
public RedisConnection getConnection() {
currentFailures = 0;
RedisConnection mockRedisConnection = mock(RedisConnection.class);
RedisHashCommands hashCommands = mock(RedisHashCommands.class);
when(hashCommands.hSet(any(byte[].class), any(byte[].class), any(byte[].class))).thenAnswer((Answer<Boolean>) invocationOnMock -> {
currentFailures++;
if (failAfterN > 0 && currentFailures > failAfterN) {
throw new UncheckedIOException(new IOException("error during hset"));
}
final byte[] hashValue = invocationOnMock.getArgument(0);
final byte[] keyValue = invocationOnMock.getArgument(1);
final byte[] valueValue = invocationOnMock.getArgument(2);
if (hashValue == null || keyValue == null || valueValue == null) {
throw new NullPointerException("hash, key, and value must not be null");
}
final String hashString = new String(hashValue);
Map<String, String> kvMap = hashStore.get(hashString);
if (kvMap == null) {
kvMap = new HashMap<>();
}
kvMap.put(new String(keyValue), new String(valueValue));
hashStore.put(hashString, kvMap);
return Boolean.TRUE;
});
when(mockRedisConnection.hashCommands()).thenReturn(hashCommands);
return mockRedisConnection;
}
public void setFailAfterN(int triesBeforeFailure) {
failAfterN = Math.max(triesBeforeFailure, 0);
}
}
}

View File

@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -125,7 +126,7 @@ public class ITRedisDistributedMapCacheClientService {
// create, configure, and enable the RedisDistributedMapCacheClient service
redisMapCacheClientService = new RedisDistributedMapCacheClientService();
testRunner.addControllerService("redis-map-cache-client", redisMapCacheClientService);
testRunner.setProperty(redisMapCacheClientService, RedisDistributedMapCacheClientService.REDIS_CONNECTION_POOL, "redis-connection-pool");
testRunner.setProperty(redisMapCacheClientService, REDIS_CONNECTION_POOL, "redis-connection-pool");
testRunner.enableControllerService(redisMapCacheClientService);
testRunner.setProperty(TestRedisProcessor.REDIS_MAP_CACHE, "redis-map-cache-client");
}
@ -182,7 +183,7 @@ public class ITRedisDistributedMapCacheClientService {
final String key = "test-redis-processor-" + timestamp;
final String value = "the time is " + timestamp;
// verify the key doesn't exists, put the key/value, then verify it exists
// verify the key doesn't exist, put the key/value, then verify it exists
assertFalse(cacheClient.containsKey(key, stringSerializer));
cacheClient.put(key, value, stringSerializer, stringSerializer);
assertTrue(cacheClient.containsKey(key, stringSerializer));