From da33e2859ce45321d28901e5820c38a37dcfc709 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Sun, 24 Jul 2016 16:18:55 -0700 Subject: [PATCH] NIFI-2398 - GetIgnite processor This closes #721. --- .../ignite/AbstractIgniteProcessor.java | 30 +- .../cache/AbstractIgniteCacheProcessor.java | 35 +- .../ignite/cache/GetIgniteCache.java | 117 ++++++ .../ignite/cache/PutIgniteCache.java | 41 +- .../org.apache.nifi.processor.Processor | 1 + .../ignite/cache/ITGetIgniteCache.java | 184 +++++++++ .../ignite/cache/ITPutIgniteCache.java | 67 ++- .../ignite/cache/TestGetIgniteCache.java | 383 ++++++++++++++++++ .../ignite/cache/TestPutIgniteCache.java | 9 +- 9 files changed, 817 insertions(+), 50 deletions(-) create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java index 1feaf83d27..28b7b52f5c 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java @@ -18,6 +18,8 @@ */ package org.apache.nifi.processors.ignite; +import java.util.List; + import org.apache.commons.lang3.StringUtils; import org.apache.ignite.Ignite; import org.apache.ignite.Ignition; @@ -88,20 +90,28 @@ public abstract class AbstractIgniteProcessor extends AbstractProcessor { public void initializeIgnite(ProcessContext context) { if ( getIgnite() != null ) { - getLogger().warn("Ignite already initialized"); + getLogger().info("Ignite already initialized"); return; } - Ignition.setClientMode(true); - String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue(); - getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration }); - if ( StringUtils.isEmpty(configuration) ) { - ignite = Ignition.start(); - } else { - ignite = Ignition.start(configuration); + synchronized(Ignition.class) { + List grids = Ignition.allGrids(); + + if ( grids.size() == 1 ) { + getLogger().info("Ignite grid already available"); + ignite = grids.get(0); + return; + } + Ignition.setClientMode(true); + + String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue(); + getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration }); + if ( StringUtils.isEmpty(configuration) ) { + ignite = Ignition.start(); + } else { + ignite = Ignition.start(configuration); + } } - } - } diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java index 8e1a7cb008..ca6136c8ee 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java @@ -18,12 +18,12 @@ package org.apache.nifi.processors.ignite.cache; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.ignite.IgniteCache; -import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; @@ -47,9 +47,17 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess .build(); /** - * Property descriptors + * The Ignite cache key attribute */ - protected static List descriptors; + public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder() + .displayName("Ignite Cache Entry Identifier") + .name("ignite-cache-entry-identifier") + .description("A FlowFile attribute, or attribute expression used " + + "for determining Ignite cache key for the Flow File content") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); /** * Relations @@ -57,16 +65,19 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess protected static Set relationships; /** - * Ignite cache instance + * Ignite cache name */ - private transient IgniteCache igniteCache; + private String cacheName; /** * Get ignite cache instance * @return ignite cache instance */ protected IgniteCache getIgniteCache() { - return igniteCache; + if ( getIgnite() == null ) + return null; + else + return getIgnite().getOrCreateCache(cacheName); } static { @@ -96,8 +107,7 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess super.initializeIgnite(context); } - String cacheName = context.getProperty(CACHE_NAME).getValue(); - igniteCache = getIgnite().getOrCreateCache(cacheName); + cacheName = context.getProperty(CACHE_NAME).getValue(); } catch (Exception e) { getLogger().error("Failed to initialize ignite cache due to {}", new Object[] { e }, e); @@ -108,12 +118,11 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess /** * Close Ignite cache instance and calls base class closeIgnite */ - @OnStopped + @OnShutdown public void closeIgniteCache() { - if (igniteCache != null) { + if (getIgniteCache() != null) { getLogger().info("Closing ignite cache"); - igniteCache.close(); - igniteCache = null; + getIgniteCache().close(); } super.closeIgnite(); } diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java new file mode 100644 index 0000000000..77fc05551e --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import java.io.ByteArrayInputStream; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * Get cache processors which gets byte array for the key from Ignite cache and set the array + * as the FlowFile content. + */ +@EventDriven +@SupportsBatching +@Tags({ "Ignite", "get", "read", "cache", "key" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Get the byte array from Ignite Cache and adds it as the content of a FlowFile." + + "The processor uses the value of FlowFile attribute (Ignite cache entry key) as the cache key lookup. " + + "If the entry corresponding to the key is not found in the cache an error message is associated with the FlowFile " + + "Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message " + + " can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.") +@WritesAttributes({ + @WritesAttribute(attribute = GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, description = "The reason for getting entry from cache"), + }) +@SeeAlso({PutIgniteCache.class}) +public class GetIgniteCache extends AbstractIgniteCacheProcessor { + + /** Flow file attribute keys and messages */ + public static final String IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.get.failed.reason"; + public static final String IGNITE_GET_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing"; + public static final String IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE = "The cache byte array entry was null or zero length"; + public static final String IGNITE_GET_FAILED_MESSAGE_PREFIX = "The cache request failed because of "; + + /** + * Property descriptors + */ + protected static final List descriptors = + Arrays.asList(IGNITE_CONFIGURATION_FILE, CACHE_NAME, IGNITE_CACHE_ENTRY_KEY); + + @Override + public List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public final void initialize(ProcessContext context) throws ProcessException { + super.initializeIgniteCache(context); + } + + /** + * Handle flow file and gets the entry from the cache based on the key attribute + */ + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue(); + if ( StringUtils.isEmpty(key) ) { + flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, IGNITE_GET_FAILED_MISSING_KEY_MESSAGE); + session.transfer(flowFile, REL_FAILURE); + } else { + try { + byte [] value = getIgniteCache().get(key); + if ( value == null || value.length == 0 ) { + flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, + IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE); + session.transfer(flowFile, REL_FAILURE); + } else { + ByteArrayInputStream bais = new ByteArrayInputStream(value); + flowFile = session.importFrom(bais, flowFile); + session.transfer(flowFile,REL_SUCCESS); + } + } catch(Exception exception) { + flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, + IGNITE_GET_FAILED_MESSAGE_PREFIX + exception); + getLogger().error("Failed to get value for key {} from IgniteDB due to {}", new Object[] { key, exception }, exception); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java index 316ed8f9bf..2d1471ecd4 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,12 +35,13 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -71,6 +73,7 @@ import org.apache.nifi.stream.io.StreamUtils; @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, description = "The total number of failed FlowFiles in the batch"), @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, description = "The failed reason attribute key") }) +@SeeAlso({GetIgniteCache.class}) public class PutIgniteCache extends AbstractIgniteCacheProcessor { /** @@ -138,16 +141,6 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor { .sensitive(false) .build(); - public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder() - .displayName("Ignite Cache Entry Identifier") - .name("ignite-cache-entry-identifier") - .description("A FlowFile attribute, or attribute expression used " + - "for determining Ignite cache key for the Flow File content") - .required(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) - .expressionLanguageSupported(true) - .build(); - /** Flow file attribute keys and messages */ public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count"; public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number"; @@ -160,17 +153,15 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor { public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing"; public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero"; - static { - descriptors = new ArrayList<>(); - descriptors.add(IGNITE_CONFIGURATION_FILE); - descriptors.add(CACHE_NAME); - descriptors.add(BATCH_SIZE); - descriptors.add(IGNITE_CACHE_ENTRY_KEY); - descriptors.add(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS); - descriptors.add(DATA_STREAMER_PER_NODE_BUFFER_SIZE); - descriptors.add(DATA_STREAMER_AUTO_FLUSH_FREQUENCY); - descriptors.add(DATA_STREAMER_ALLOW_OVERRIDE); - } + /** + * Property descriptors + */ + protected static final List descriptors = + Arrays.asList(IGNITE_CONFIGURATION_FILE,CACHE_NAME,BATCH_SIZE, + IGNITE_CACHE_ENTRY_KEY, + DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS, + DATA_STREAMER_PER_NODE_BUFFER_SIZE, + DATA_STREAMER_AUTO_FLUSH_FREQUENCY,DATA_STREAMER_ALLOW_OVERRIDE); /** * Data streamer instance @@ -190,9 +181,13 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor { if (igniteDataStreamer != null) { getLogger().info("Closing ignite data streamer"); igniteDataStreamer.flush(); - igniteDataStreamer.close(); igniteDataStreamer = null; } + } + + @OnShutdown + public final void closeIgniteDataStreamerAndCache() { + closeIgniteDataStreamer(); super.closeIgniteCache(); } diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d1da921940..931f94faaf 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.ignite.cache.PutIgniteCache +org.apache.nifi.processors.ignite.cache.GetIgniteCache diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java new file mode 100644 index 0000000000..3adf407c31 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ignite.IgniteCache; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ITGetIgniteCache { + + private static final String CACHE_NAME = "testCache"; + private static TestRunner runner; + private static GetIgniteCache getIgniteCache; + private static Map properties1; + private static HashMap properties2; + + @BeforeClass + public static void setUp() throws IOException { + getIgniteCache = new GetIgniteCache(); + properties1 = new HashMap(); + properties2 = new HashMap(); + } + + @AfterClass + public static void teardown() { + runner = null; + IgniteCache cache = getIgniteCache.getIgniteCache(); + if (cache != null ) + cache.destroy(); + getIgniteCache = null; + } + + @Test + public void testgetIgniteCacheOnTriggerFileConfigurationOneFlowFile() throws IOException, InterruptedException { + runner = TestRunners.newTestRunner(getIgniteCache); + runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(GetIgniteCache.IGNITE_CONFIGURATION_FILE, + "file:///" + new File(".").getAbsolutePath() + "/src/test/resources/test-ignite.xml"); + runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + properties1.put("igniteKey", "key5"); + runner.enqueue("test5".getBytes(),properties1); + + getIgniteCache.initialize(runner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("key5", "test5".getBytes()); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + + out.assertContentEquals("test5".getBytes()); + Assert.assertArrayEquals("test5".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key5")); + runner.shutdown(); + } + + @Test + public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFile() throws IOException, InterruptedException { + runner = TestRunners.newTestRunner(getIgniteCache); + runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + getIgniteCache.initialize(runner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("key51", "test51".getBytes()); + getIgniteCache.getIgniteCache().put("key52", "test52".getBytes()); + runner.run(2, false, true); + + runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(2, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + + out.assertContentEquals("test51".getBytes()); + Assert.assertArrayEquals("test51".getBytes(), + (byte[])getIgniteCache.getIgniteCache().get("key51")); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1); + + out2.assertContentEquals("test52".getBytes()); + Assert.assertArrayEquals("test52".getBytes(), + (byte[])getIgniteCache.getIgniteCache().get("key52")); + + runner.shutdown(); + } + + @Test + public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopStart2Times() throws IOException, InterruptedException { + runner = TestRunners.newTestRunner(getIgniteCache); + runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + getIgniteCache.initialize(runner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("key51", "test51".getBytes()); + getIgniteCache.getIgniteCache().put("key52", "test52".getBytes()); + runner.run(2, false, true); + + runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2); + + getIgniteCache.closeIgniteCache(); + + runner.clearTransferState(); + + // reinit and check first time + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + getIgniteCache.initialize(runner.getProcessContext()); + + runner.run(2, false, true); + + runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2); + + getIgniteCache.closeIgniteCache(); + + runner.clearTransferState(); + + // reinit and check second time + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + getIgniteCache.initialize(runner.getProcessContext()); + + runner.run(2, false, true); + + runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2); + + getIgniteCache.closeIgniteCache(); + + runner.clearTransferState(); + + } +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java index e8a469eef8..7c675363c9 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java @@ -50,6 +50,7 @@ public class ITPutIgniteCache { @AfterClass public static void teardown() { runner = null; + putIgniteCache.getIgniteCache().destroy(); putIgniteCache = null; } @@ -82,8 +83,9 @@ public class ITPutIgniteCache { out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); out.assertContentEquals("test".getBytes()); + System.out.println("Value was: " + new String((byte[])putIgniteCache.getIgniteCache().get("key5"))); Assert.assertArrayEquals("test".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key5")); - runner.shutdown(); + putIgniteCache.getIgniteCache().remove("key5"); } @Test @@ -115,6 +117,7 @@ public class ITPutIgniteCache { out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); out.assertContentEquals("test1".getBytes()); + System.out.println("value was " + new String(putIgniteCache.getIgniteCache().get("key51"))); Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key51")); final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1); @@ -126,7 +129,67 @@ public class ITPutIgniteCache { out2.assertContentEquals("test2".getBytes()); Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key52")); + putIgniteCache.getIgniteCache().remove("key52"); + putIgniteCache.getIgniteCache().remove("key51"); + + } + + @Test + public void testPutIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopAndStart2Times() throws IOException, InterruptedException { + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + runner.run(1, false, true); + putIgniteCache.getIgniteCache().remove("key51"); + putIgniteCache.getIgniteCache().remove("key52"); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2); + putIgniteCache.getIgniteCache().remove("key52"); + putIgniteCache.getIgniteCache().remove("key52"); + + // Close and restart first time + putIgniteCache.closeIgniteDataStreamer(); + + runner.clearTransferState(); + + putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext()); + + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2); + putIgniteCache.getIgniteCache().remove("key51"); + putIgniteCache.getIgniteCache().remove("key52"); + + // Close and restart second time + putIgniteCache.closeIgniteDataStreamer(); + + runner.clearTransferState(); + + putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext()); + + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2); + putIgniteCache.getIgniteCache().remove("key52"); + putIgniteCache.getIgniteCache().remove("key51"); - runner.shutdown(); } } diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java new file mode 100644 index 0000000000..8692383ee8 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestGetIgniteCache { + + private static final String CACHE_NAME = "testCache"; + private TestRunner getRunner; + private GetIgniteCache getIgniteCache; + private Map properties1; + private Map properties2; + private static Ignite ignite; + + @BeforeClass + public static void setUpClass() { + ignite = Ignition.start("test-ignite.xml"); + + } + + @AfterClass + public static void tearDownClass() { + ignite.close(); + Ignition.stop(true); + } + + @Before + public void setUp() throws IOException { + getIgniteCache = new GetIgniteCache() { + @Override + protected Ignite getIgnite() { + return TestGetIgniteCache.ignite; + } + + }; + + properties1 = new HashMap(); + properties1.put("igniteKey", "key1"); + properties2 = new HashMap(); + properties2.put("igniteKey", "key2"); + + } + + @After + public void teardown() { + getRunner = null; + ignite.destroyCache(CACHE_NAME); + } + + @Test + public void testGetIgniteCacheDefaultConfOneFlowFileWithPlainKey() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey"); + + getRunner.assertValid(); + getRunner.enqueue(new byte[] {}); + + getIgniteCache.initialize(getRunner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("mykey", "test".getBytes()); + + getRunner.run(1, false, true); + + getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1); + List getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(1, getSucessfulFlowFiles.size()); + List getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(0, getFailureFlowFiles.size()); + + final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + getOut.assertContentEquals("test".getBytes()); + + getRunner.shutdown(); + } + + @Test + public void testGetIgniteCacheNullGetCacheThrowsException() throws IOException, InterruptedException { + + getIgniteCache = new GetIgniteCache() { + @Override + protected Ignite getIgnite() { + return TestGetIgniteCache.ignite; + } + + @Override + protected IgniteCache getIgniteCache() { + return null; + } + + }; + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey"); + + getRunner.assertValid(); + getRunner.enqueue(new byte[] {}); + + getIgniteCache.initialize(getRunner.getProcessContext()); + + getRunner.run(1, false, true); + + getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1); + List getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(0, getSucessfulFlowFiles.size()); + List getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(1, getFailureFlowFiles.size()); + + final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0); + getOut.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, + GetIgniteCache.IGNITE_GET_FAILED_MESSAGE_PREFIX + "java.lang.NullPointerException"); + + getRunner.shutdown(); + } + + @Test + public void testGetIgniteCacheDefaultConfOneFlowFileWithKeyExpression() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + getRunner.assertValid(); + getRunner.enqueue("".getBytes(),properties1); + + getIgniteCache.initialize(getRunner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("key1", "test".getBytes()); + + getRunner.run(1, false, true); + + getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1); + List sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + + out.assertContentEquals("test".getBytes()); + getRunner.shutdown(); + } + + @Test + public void testGetIgniteCacheDefaultConfTwoFlowFilesWithExpressionKeys() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + getRunner.assertValid(); + getRunner.enqueue("".getBytes(),properties1); + getRunner.enqueue("".getBytes(),properties2); + + getIgniteCache.initialize(getRunner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("key1", "test1".getBytes()); + getIgniteCache.getIgniteCache().put("key2", "test2".getBytes()); + + getRunner.run(2, false, true); + + getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2); + + List sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(2, sucessfulFlowFiles.size()); + List failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + + out1.assertContentEquals("test1".getBytes()); + Assert.assertEquals("test1",new String(getIgniteCache.getIgniteCache().get("key1"))); + + final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1); + + out2.assertContentEquals("test2".getBytes()); + + Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2")); + + getRunner.shutdown(); + } + + @Test + public void testGetIgniteCacheDefaultConfOneFlowFileNoKey() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + getRunner.assertValid(); + properties1.clear(); + getRunner.enqueue("".getBytes(),properties1); + getIgniteCache.initialize(getRunner.getProcessContext()); + + getRunner.run(1, false, true); + + getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1); + List sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(0, sucessfulFlowFiles.size()); + List failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0); + + out.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE); + + getRunner.shutdown(); + } + + + + @Test + public void testGetIgniteCacheDefaultConfTwoFlowFilesNoKey() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + getRunner.assertValid(); + + properties1.clear(); + getRunner.enqueue("".getBytes(),properties1); + getRunner.enqueue("".getBytes(),properties1); + + getIgniteCache.initialize(getRunner.getProcessContext()); + + getRunner.run(2, false, true); + + getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 2); + List sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(0, sucessfulFlowFiles.size()); + List failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(2, failureFlowFiles.size()); + + final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0); + out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE); + final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(1); + out2.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE); + + getRunner.shutdown(); + + } + + @Test + public void testGetIgniteCacheDefaultConfTwoFlowFileFirstNoKey() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + getRunner.assertValid(); + getRunner.enqueue("".getBytes()); + getRunner.enqueue("".getBytes(),properties2); + getIgniteCache.initialize(getRunner.getProcessContext()); + getIgniteCache.getIgniteCache().put("key2", "test2".getBytes()); + + getRunner.run(2, false, true); + + List sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0); + + out1.assertContentEquals("".getBytes()); + out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE); + + final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + + out2.assertContentEquals("test2".getBytes()); + Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2")); + + getRunner.shutdown(); + } + + @Test + public void testGetIgniteCacheDefaultConfTwoFlowFileSecondNoKey() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + getRunner.assertValid(); + getRunner.enqueue("".getBytes(),properties1); + getRunner.enqueue("".getBytes()); + getIgniteCache.initialize(getRunner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("key1", "test1".getBytes()); + getRunner.run(2, false, true); + + List sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0); + + out1.assertContentEquals("".getBytes()); + out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE); + + final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + + out2.assertContentEquals("test1".getBytes()); + Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1")); + + getRunner.shutdown(); + + } + + + @Test + public void testGetIgniteCacheDefaultConfThreeFlowFilesOneOkSecondOkThirdNoExpressionKey() throws IOException, InterruptedException { + + getRunner = TestRunners.newTestRunner(getIgniteCache); + getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME); + getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + getRunner.assertValid(); + getRunner.enqueue("".getBytes(),properties1); + getRunner.enqueue("".getBytes(),properties2); + getRunner.enqueue("".getBytes()); + getIgniteCache.initialize(getRunner.getProcessContext()); + + getIgniteCache.getIgniteCache().put("key1", "test1".getBytes()); + getIgniteCache.getIgniteCache().put("key2", "test2".getBytes()); + getRunner.run(3, false, true); + + List sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS); + assertEquals(2, sucessfulFlowFiles.size()); + List failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0); + + out1.assertContentEquals("".getBytes()); + out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE); + + final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0); + + out2.assertContentEquals("test1".getBytes()); + Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1")); + + final MockFlowFile out3 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1); + + out3.assertContentEquals("test2".getBytes()); + Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2")); + + getRunner.shutdown(); + + } + +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java index 56397951ee..4d7e5c6dda 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java @@ -47,13 +47,18 @@ public class TestPutIgniteCache { @BeforeClass public static void setUpClass() { - ignite = Ignition.start("test-ignite.xml"); + List grids = Ignition.allGrids(); + if ( grids.size() == 1 ) + ignite = grids.get(0); + else + ignite = Ignition.start("test-ignite.xml"); } @AfterClass public static void tearDownClass() { - ignite.close(); + if ( ignite != null ) + ignite.close(); Ignition.stop(true); }