NIFI-2398 - GetIgnite processor

This closes #721.
This commit is contained in:
mans2singh 2016-07-24 16:18:55 -07:00 committed by Pierre Villard
parent e46fea920a
commit da33e2859c
9 changed files with 817 additions and 50 deletions

View File

@ -18,6 +18,8 @@
*/
package org.apache.nifi.processors.ignite;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
@ -88,20 +90,28 @@ public abstract class AbstractIgniteProcessor extends AbstractProcessor {
public void initializeIgnite(ProcessContext context) {
if ( getIgnite() != null ) {
getLogger().warn("Ignite already initialized");
getLogger().info("Ignite already initialized");
return;
}
Ignition.setClientMode(true);
String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue();
getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration });
if ( StringUtils.isEmpty(configuration) ) {
ignite = Ignition.start();
} else {
ignite = Ignition.start(configuration);
synchronized(Ignition.class) {
List<Ignite> grids = Ignition.allGrids();
if ( grids.size() == 1 ) {
getLogger().info("Ignite grid already available");
ignite = grids.get(0);
return;
}
Ignition.setClientMode(true);
String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue();
getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration });
if ( StringUtils.isEmpty(configuration) ) {
ignite = Ignition.start();
} else {
ignite = Ignition.start(configuration);
}
}
}
}

View File

@ -18,12 +18,12 @@ package org.apache.nifi.processors.ignite.cache;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteCache;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@ -47,9 +47,17 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
.build();
/**
* Property descriptors
* The Ignite cache key attribute
*/
protected static List<PropertyDescriptor> descriptors;
public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder()
.displayName("Ignite Cache Entry Identifier")
.name("ignite-cache-entry-identifier")
.description("A FlowFile attribute, or attribute expression used " +
"for determining Ignite cache key for the Flow File content")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();
/**
* Relations
@ -57,16 +65,19 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
protected static Set<Relationship> relationships;
/**
* Ignite cache instance
* Ignite cache name
*/
private transient IgniteCache<String,byte[]> igniteCache;
private String cacheName;
/**
* Get ignite cache instance
* @return ignite cache instance
*/
protected IgniteCache<String, byte[]> getIgniteCache() {
return igniteCache;
if ( getIgnite() == null )
return null;
else
return getIgnite().getOrCreateCache(cacheName);
}
static {
@ -96,8 +107,7 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
super.initializeIgnite(context);
}
String cacheName = context.getProperty(CACHE_NAME).getValue();
igniteCache = getIgnite().getOrCreateCache(cacheName);
cacheName = context.getProperty(CACHE_NAME).getValue();
} catch (Exception e) {
getLogger().error("Failed to initialize ignite cache due to {}", new Object[] { e }, e);
@ -108,12 +118,11 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
/**
* Close Ignite cache instance and calls base class closeIgnite
*/
@OnStopped
@OnShutdown
public void closeIgniteCache() {
if (igniteCache != null) {
if (getIgniteCache() != null) {
getLogger().info("Closing ignite cache");
igniteCache.close();
igniteCache = null;
getIgniteCache().close();
}
super.closeIgnite();
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.ignite.cache;
import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
/**
* Get cache processors which gets byte array for the key from Ignite cache and set the array
* as the FlowFile content.
*/
@EventDriven
@SupportsBatching
@Tags({ "Ignite", "get", "read", "cache", "key" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Get the byte array from Ignite Cache and adds it as the content of a FlowFile." +
"The processor uses the value of FlowFile attribute (Ignite cache entry key) as the cache key lookup. " +
"If the entry corresponding to the key is not found in the cache an error message is associated with the FlowFile " +
"Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message " +
" can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.")
@WritesAttributes({
@WritesAttribute(attribute = GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, description = "The reason for getting entry from cache"),
})
@SeeAlso({PutIgniteCache.class})
public class GetIgniteCache extends AbstractIgniteCacheProcessor {
/** Flow file attribute keys and messages */
public static final String IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.get.failed.reason";
public static final String IGNITE_GET_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
public static final String IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE = "The cache byte array entry was null or zero length";
public static final String IGNITE_GET_FAILED_MESSAGE_PREFIX = "The cache request failed because of ";
/**
* Property descriptors
*/
protected static final List<PropertyDescriptor> descriptors =
Arrays.asList(IGNITE_CONFIGURATION_FILE, CACHE_NAME, IGNITE_CACHE_ENTRY_KEY);
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public final void initialize(ProcessContext context) throws ProcessException {
super.initializeIgniteCache(context);
}
/**
* Handle flow file and gets the entry from the cache based on the key attribute
*/
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
if ( StringUtils.isEmpty(key) ) {
flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
session.transfer(flowFile, REL_FAILURE);
} else {
try {
byte [] value = getIgniteCache().get(key);
if ( value == null || value.length == 0 ) {
flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE);
session.transfer(flowFile, REL_FAILURE);
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(value);
flowFile = session.importFrom(bais, flowFile);
session.transfer(flowFile,REL_SUCCESS);
}
} catch(Exception exception) {
flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
IGNITE_GET_FAILED_MESSAGE_PREFIX + exception);
getLogger().error("Failed to get value for key {} from IgniteDB due to {}", new Object[] { key, exception }, exception);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
}
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -34,12 +35,13 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -71,6 +73,7 @@ import org.apache.nifi.stream.io.StreamUtils;
@WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, description = "The total number of failed FlowFiles in the batch"),
@WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, description = "The failed reason attribute key")
})
@SeeAlso({GetIgniteCache.class})
public class PutIgniteCache extends AbstractIgniteCacheProcessor {
/**
@ -138,16 +141,6 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
.sensitive(false)
.build();
public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder()
.displayName("Ignite Cache Entry Identifier")
.name("ignite-cache-entry-identifier")
.description("A FlowFile attribute, or attribute expression used " +
"for determining Ignite cache key for the Flow File content")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();
/** Flow file attribute keys and messages */
public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count";
public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number";
@ -160,17 +153,15 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero";
static {
descriptors = new ArrayList<>();
descriptors.add(IGNITE_CONFIGURATION_FILE);
descriptors.add(CACHE_NAME);
descriptors.add(BATCH_SIZE);
descriptors.add(IGNITE_CACHE_ENTRY_KEY);
descriptors.add(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS);
descriptors.add(DATA_STREAMER_PER_NODE_BUFFER_SIZE);
descriptors.add(DATA_STREAMER_AUTO_FLUSH_FREQUENCY);
descriptors.add(DATA_STREAMER_ALLOW_OVERRIDE);
}
/**
* Property descriptors
*/
protected static final List<PropertyDescriptor> descriptors =
Arrays.asList(IGNITE_CONFIGURATION_FILE,CACHE_NAME,BATCH_SIZE,
IGNITE_CACHE_ENTRY_KEY,
DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS,
DATA_STREAMER_PER_NODE_BUFFER_SIZE,
DATA_STREAMER_AUTO_FLUSH_FREQUENCY,DATA_STREAMER_ALLOW_OVERRIDE);
/**
* Data streamer instance
@ -190,9 +181,13 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
if (igniteDataStreamer != null) {
getLogger().info("Closing ignite data streamer");
igniteDataStreamer.flush();
igniteDataStreamer.close();
igniteDataStreamer = null;
}
}
@OnShutdown
public final void closeIgniteDataStreamerAndCache() {
closeIgniteDataStreamer();
super.closeIgniteCache();
}

View File

@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.ignite.cache.PutIgniteCache
org.apache.nifi.processors.ignite.cache.GetIgniteCache

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.ignite.cache;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCache;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class ITGetIgniteCache {
private static final String CACHE_NAME = "testCache";
private static TestRunner runner;
private static GetIgniteCache getIgniteCache;
private static Map<String,String> properties1;
private static HashMap<String, String> properties2;
@BeforeClass
public static void setUp() throws IOException {
getIgniteCache = new GetIgniteCache();
properties1 = new HashMap<String,String>();
properties2 = new HashMap<String,String>();
}
@AfterClass
public static void teardown() {
runner = null;
IgniteCache<String, byte[]> cache = getIgniteCache.getIgniteCache();
if (cache != null )
cache.destroy();
getIgniteCache = null;
}
@Test
public void testgetIgniteCacheOnTriggerFileConfigurationOneFlowFile() throws IOException, InterruptedException {
runner = TestRunners.newTestRunner(getIgniteCache);
runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
runner.setProperty(GetIgniteCache.IGNITE_CONFIGURATION_FILE,
"file:///" + new File(".").getAbsolutePath() + "/src/test/resources/test-ignite.xml");
runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
runner.assertValid();
properties1.put("igniteKey", "key5");
runner.enqueue("test5".getBytes(),properties1);
getIgniteCache.initialize(runner.getProcessContext());
getIgniteCache.getIgniteCache().put("key5", "test5".getBytes());
runner.run(1, false, true);
runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(1, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(0, failureFlowFiles.size());
final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
out.assertContentEquals("test5".getBytes());
Assert.assertArrayEquals("test5".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key5"));
runner.shutdown();
}
@Test
public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFile() throws IOException, InterruptedException {
runner = TestRunners.newTestRunner(getIgniteCache);
runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
runner.assertValid();
properties1.put("igniteKey", "key51");
runner.enqueue("test1".getBytes(),properties1);
properties2.put("igniteKey", "key52");
runner.enqueue("test2".getBytes(),properties2);
getIgniteCache.initialize(runner.getProcessContext());
getIgniteCache.getIgniteCache().put("key51", "test51".getBytes());
getIgniteCache.getIgniteCache().put("key52", "test52".getBytes());
runner.run(2, false, true);
runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(2, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(0, failureFlowFiles.size());
final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
out.assertContentEquals("test51".getBytes());
Assert.assertArrayEquals("test51".getBytes(),
(byte[])getIgniteCache.getIgniteCache().get("key51"));
final MockFlowFile out2 = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
out2.assertContentEquals("test52".getBytes());
Assert.assertArrayEquals("test52".getBytes(),
(byte[])getIgniteCache.getIgniteCache().get("key52"));
runner.shutdown();
}
@Test
public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopStart2Times() throws IOException, InterruptedException {
runner = TestRunners.newTestRunner(getIgniteCache);
runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
runner.assertValid();
properties1.put("igniteKey", "key51");
runner.enqueue("test1".getBytes(),properties1);
properties2.put("igniteKey", "key52");
runner.enqueue("test2".getBytes(),properties2);
getIgniteCache.initialize(runner.getProcessContext());
getIgniteCache.getIgniteCache().put("key51", "test51".getBytes());
getIgniteCache.getIgniteCache().put("key52", "test52".getBytes());
runner.run(2, false, true);
runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
getIgniteCache.closeIgniteCache();
runner.clearTransferState();
// reinit and check first time
runner.assertValid();
properties1.put("igniteKey", "key51");
runner.enqueue("test1".getBytes(),properties1);
properties2.put("igniteKey", "key52");
runner.enqueue("test2".getBytes(),properties2);
getIgniteCache.initialize(runner.getProcessContext());
runner.run(2, false, true);
runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
getIgniteCache.closeIgniteCache();
runner.clearTransferState();
// reinit and check second time
runner.assertValid();
properties1.put("igniteKey", "key51");
runner.enqueue("test1".getBytes(),properties1);
properties2.put("igniteKey", "key52");
runner.enqueue("test2".getBytes(),properties2);
getIgniteCache.initialize(runner.getProcessContext());
runner.run(2, false, true);
runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
getIgniteCache.closeIgniteCache();
runner.clearTransferState();
}
}

View File

@ -50,6 +50,7 @@ public class ITPutIgniteCache {
@AfterClass
public static void teardown() {
runner = null;
putIgniteCache.getIgniteCache().destroy();
putIgniteCache = null;
}
@ -82,8 +83,9 @@ public class ITPutIgniteCache {
out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0");
out.assertContentEquals("test".getBytes());
System.out.println("Value was: " + new String((byte[])putIgniteCache.getIgniteCache().get("key5")));
Assert.assertArrayEquals("test".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key5"));
runner.shutdown();
putIgniteCache.getIgniteCache().remove("key5");
}
@Test
@ -115,6 +117,7 @@ public class ITPutIgniteCache {
out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0");
out.assertContentEquals("test1".getBytes());
System.out.println("value was " + new String(putIgniteCache.getIgniteCache().get("key51")));
Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key51"));
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1);
@ -126,7 +129,67 @@ public class ITPutIgniteCache {
out2.assertContentEquals("test2".getBytes());
Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key52"));
putIgniteCache.getIgniteCache().remove("key52");
putIgniteCache.getIgniteCache().remove("key51");
}
@Test
public void testPutIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopAndStart2Times() throws IOException, InterruptedException {
runner = TestRunners.newTestRunner(putIgniteCache);
runner.setProperty(PutIgniteCache.BATCH_SIZE, "5");
runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME);
runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1");
runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
runner.assertValid();
properties1.put("igniteKey", "key51");
runner.enqueue("test1".getBytes(),properties1);
properties2.put("igniteKey", "key52");
runner.enqueue("test2".getBytes(),properties2);
runner.run(1, false, true);
putIgniteCache.getIgniteCache().remove("key51");
putIgniteCache.getIgniteCache().remove("key52");
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
putIgniteCache.getIgniteCache().remove("key52");
putIgniteCache.getIgniteCache().remove("key52");
// Close and restart first time
putIgniteCache.closeIgniteDataStreamer();
runner.clearTransferState();
putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext());
runner.assertValid();
properties1.put("igniteKey", "key51");
runner.enqueue("test1".getBytes(),properties1);
properties2.put("igniteKey", "key52");
runner.enqueue("test2".getBytes(),properties2);
runner.run(1, false, true);
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
putIgniteCache.getIgniteCache().remove("key51");
putIgniteCache.getIgniteCache().remove("key52");
// Close and restart second time
putIgniteCache.closeIgniteDataStreamer();
runner.clearTransferState();
putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext());
runner.assertValid();
properties1.put("igniteKey", "key51");
runner.enqueue("test1".getBytes(),properties1);
properties2.put("igniteKey", "key52");
runner.enqueue("test2".getBytes(),properties2);
runner.run(1, false, true);
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
putIgniteCache.getIgniteCache().remove("key52");
putIgniteCache.getIgniteCache().remove("key51");
runner.shutdown();
}
}

View File

@ -0,0 +1,383 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.ignite.cache;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestGetIgniteCache {
private static final String CACHE_NAME = "testCache";
private TestRunner getRunner;
private GetIgniteCache getIgniteCache;
private Map<String,String> properties1;
private Map<String,String> properties2;
private static Ignite ignite;
@BeforeClass
public static void setUpClass() {
ignite = Ignition.start("test-ignite.xml");
}
@AfterClass
public static void tearDownClass() {
ignite.close();
Ignition.stop(true);
}
@Before
public void setUp() throws IOException {
getIgniteCache = new GetIgniteCache() {
@Override
protected Ignite getIgnite() {
return TestGetIgniteCache.ignite;
}
};
properties1 = new HashMap<String,String>();
properties1.put("igniteKey", "key1");
properties2 = new HashMap<String,String>();
properties2.put("igniteKey", "key2");
}
@After
public void teardown() {
getRunner = null;
ignite.destroyCache(CACHE_NAME);
}
@Test
public void testGetIgniteCacheDefaultConfOneFlowFileWithPlainKey() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey");
getRunner.assertValid();
getRunner.enqueue(new byte[] {});
getIgniteCache.initialize(getRunner.getProcessContext());
getIgniteCache.getIgniteCache().put("mykey", "test".getBytes());
getRunner.run(1, false, true);
getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
List<MockFlowFile> getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(1, getSucessfulFlowFiles.size());
List<MockFlowFile> getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(0, getFailureFlowFiles.size());
final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
getOut.assertContentEquals("test".getBytes());
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheNullGetCacheThrowsException() throws IOException, InterruptedException {
getIgniteCache = new GetIgniteCache() {
@Override
protected Ignite getIgnite() {
return TestGetIgniteCache.ignite;
}
@Override
protected IgniteCache<String, byte[]> getIgniteCache() {
return null;
}
};
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey");
getRunner.assertValid();
getRunner.enqueue(new byte[] {});
getIgniteCache.initialize(getRunner.getProcessContext());
getRunner.run(1, false, true);
getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1);
List<MockFlowFile> getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(0, getSucessfulFlowFiles.size());
List<MockFlowFile> getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(1, getFailureFlowFiles.size());
final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
getOut.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
GetIgniteCache.IGNITE_GET_FAILED_MESSAGE_PREFIX + "java.lang.NullPointerException");
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheDefaultConfOneFlowFileWithKeyExpression() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
getRunner.assertValid();
getRunner.enqueue("".getBytes(),properties1);
getIgniteCache.initialize(getRunner.getProcessContext());
getIgniteCache.getIgniteCache().put("key1", "test".getBytes());
getRunner.run(1, false, true);
getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(1, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(0, failureFlowFiles.size());
final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
out.assertContentEquals("test".getBytes());
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheDefaultConfTwoFlowFilesWithExpressionKeys() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
getRunner.assertValid();
getRunner.enqueue("".getBytes(),properties1);
getRunner.enqueue("".getBytes(),properties2);
getIgniteCache.initialize(getRunner.getProcessContext());
getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
getRunner.run(2, false, true);
getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(2, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(0, failureFlowFiles.size());
final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
out1.assertContentEquals("test1".getBytes());
Assert.assertEquals("test1",new String(getIgniteCache.getIgniteCache().get("key1")));
final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
out2.assertContentEquals("test2".getBytes());
Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheDefaultConfOneFlowFileNoKey() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
getRunner.assertValid();
properties1.clear();
getRunner.enqueue("".getBytes(),properties1);
getIgniteCache.initialize(getRunner.getProcessContext());
getRunner.run(1, false, true);
getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1);
List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(0, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(1, failureFlowFiles.size());
final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
out.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheDefaultConfTwoFlowFilesNoKey() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
getRunner.assertValid();
properties1.clear();
getRunner.enqueue("".getBytes(),properties1);
getRunner.enqueue("".getBytes(),properties1);
getIgniteCache.initialize(getRunner.getProcessContext());
getRunner.run(2, false, true);
getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 2);
List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(0, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(2, failureFlowFiles.size());
final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(1);
out2.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheDefaultConfTwoFlowFileFirstNoKey() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
getRunner.assertValid();
getRunner.enqueue("".getBytes());
getRunner.enqueue("".getBytes(),properties2);
getIgniteCache.initialize(getRunner.getProcessContext());
getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
getRunner.run(2, false, true);
List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(1, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(1, failureFlowFiles.size());
final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
out1.assertContentEquals("".getBytes());
out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
out2.assertContentEquals("test2".getBytes());
Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheDefaultConfTwoFlowFileSecondNoKey() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
getRunner.assertValid();
getRunner.enqueue("".getBytes(),properties1);
getRunner.enqueue("".getBytes());
getIgniteCache.initialize(getRunner.getProcessContext());
getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
getRunner.run(2, false, true);
List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(1, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(1, failureFlowFiles.size());
final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
out1.assertContentEquals("".getBytes());
out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
out2.assertContentEquals("test1".getBytes());
Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1"));
getRunner.shutdown();
}
@Test
public void testGetIgniteCacheDefaultConfThreeFlowFilesOneOkSecondOkThirdNoExpressionKey() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
getRunner.assertValid();
getRunner.enqueue("".getBytes(),properties1);
getRunner.enqueue("".getBytes(),properties2);
getRunner.enqueue("".getBytes());
getIgniteCache.initialize(getRunner.getProcessContext());
getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
getRunner.run(3, false, true);
List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(2, sucessfulFlowFiles.size());
List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(1, failureFlowFiles.size());
final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
out1.assertContentEquals("".getBytes());
out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
out2.assertContentEquals("test1".getBytes());
Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1"));
final MockFlowFile out3 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
out3.assertContentEquals("test2".getBytes());
Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
getRunner.shutdown();
}
}

View File

@ -47,13 +47,18 @@ public class TestPutIgniteCache {
@BeforeClass
public static void setUpClass() {
ignite = Ignition.start("test-ignite.xml");
List<Ignite> grids = Ignition.allGrids();
if ( grids.size() == 1 )
ignite = grids.get(0);
else
ignite = Ignition.start("test-ignite.xml");
}
@AfterClass
public static void tearDownClass() {
ignite.close();
if ( ignite != null )
ignite.close();
Ignition.stop(true);
}