From ecb8067883817c97dc4a54951171c37e36af2f03 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 22 Nov 2017 11:36:30 -0500 Subject: [PATCH] NIFI-4428: Replaced JSON input with RecordReader using per-record flowfiles NIFI-4428: Added initial L&N, fixed dependency hierarchy NIFI-4428: Exposed additional ZK properties NIFI-4428: More work to reduce dependencies in Druid artifacts NIFI-4428: Removed inaccessible attribute NIFI-4428: Fixed typo in property description NIFI-4428: Updates to Druid NAR L&N NIFI-4428: Updated LICENSE to refer to ICU License for icu4j --- nifi-assembly/LICENSE | 90 +++ nifi-assembly/NOTICE | 155 +++++ .../record/util/DataTypeUtils.java | 86 ++- .../record/TestDataTypeUtils.java | 134 +++- .../pom.xml | 5 + .../src/main/resources/META-INF/LICENSE | 329 ++++++++++ .../src/main/resources/META-INF/NOTICE | 612 ++++++++++++++++++ .../nifi-druid-controller-service-api/pom.xml | 179 +++-- .../{ => druid}/DruidTranquilityService.java | 8 +- .../nifi-druid-controller-service/pom.xml | 57 +- .../DruidTranquilityController.java | 278 ++++++-- ...g.apache.nifi.controller.ControllerService | 2 +- .../DruidTranquilityControllerTest.java | 22 - .../druid/DruidTranquilityControllerTest.java | 100 +++ .../nifi-druid-bundle/nifi-druid-nar/pom.xml | 5 + .../src/main/resources/META-INF/LICENSE | 209 ++++++ .../src/main/resources/META-INF/NOTICE | 5 + .../nifi-druid-processors/pom.xml | 81 +-- .../org/apache/nifi/processors/PutDruid.java | 196 ------ .../nifi/processors/druid/PutDruidRecord.java | 373 +++++++++++ .../org.apache.nifi.processor.Processor | 2 +- .../druid/MockDruidTranquilityController.java | 150 +++++ .../apache/nifi/processors/PutDruidTest.java | 22 - .../processors/druid/PutDruidRecordTest.java | 113 ++++ nifi-nar-bundles/nifi-druid-bundle/pom.xml | 21 +- nifi-nar-bundles/pom.xml | 1 + 26 files changed, 2761 insertions(+), 474 deletions(-) create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/NOTICE rename nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/{ => druid}/DruidTranquilityService.java (87%) rename nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/{ => druid}/DruidTranquilityController.java (61%) delete mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE delete mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java delete mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE index 876082980f..0edf519b53 100644 --- a/nifi-assembly/LICENSE +++ b/nifi-assembly/LICENSE @@ -2073,3 +2073,93 @@ style license. WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + The binary distribution of this product bundles 'ANTLR 4' which is available + under a "3-clause BSD" license. For details see http://www.antlr.org/license.html + + Copyright (c) 2012 Terence Parr and Sam Harwell + All rights reserved. + Redistribution and use in source and binary forms, with or without modification, are permitted + provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + Redistributions in binary form must reproduce the above copyright notice, this list of + conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + + Neither the name of the author nor the names of its contributors may be used to endorse + or promote products derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + The binary distribution of this product bundles 'icu4j' + which is available under a X-style license. + + COPYRIGHT AND PERMISSION NOTICE + + Copyright (c) 1995-2011 International Business Machines Corporation and others + + All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy of this software + and associated documentation files (the "Software"), to deal in the Software without restriction, + including without limitation the rights to use, copy, modify, merge, publish, distribute, and/or + sell copies of the Software, and to permit persons to whom the Software is furnished to do so, + provided that the above copyright notice(s) and this permission notice appear in all copies of + the Software and that both the above copyright notice(s) and this permission notice appear in + supporting documentation. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT OF THIRD PARTY RIGHTS. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR HOLDERS INCLUDED + IN THIS NOTICE BE LIABLE FOR ANY CLAIM, OR ANY SPECIAL INDIRECT OR CONSEQUENTIAL DAMAGES, OR ANY + DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, + NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE + OF THIS SOFTWARE. + + Except as contained in this notice, the name of a copyright holder shall not be used in advertising + or otherwise to promote the sale, use or other dealings in this Software without prior written + authorization of the copyright holder. + + The binary distribution of this product bundles 'Scala Library, Scala Parser, Scala Reflect' which is available + under a "3-clause BSD" license. For details see https://github.com/scala/scala/blob/v2.11.7/doc/LICENSE.md) + + This software includes projects with other licenses -- see `doc/LICENSE.md`. + + Copyright (c) 2002-2017 EPFL + Copyright (c) 2011-2017 Lightbend, Inc. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + * Neither the name of the EPFL nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index ac5456a22a..3f2a24c5de 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -1553,6 +1553,160 @@ The following binary components are provided under the Apache Software License v The following NOTICE information applies: DataStax Java Driver for Apache Cassandra - Core Copyright (C) 2012-2017 DataStax Inc. + (ASLv2) bytebuffer-collections + The following NOTICE information applies: + bytebuffer-collections + Copyright 2011-2015 Metamarkets Group Inc. + + (ASLv2) Druid + The following NOTICE information applies: + Druid - a distributed column store. + Copyright 2012-2016 Metamarkets Group Inc. + Copyright 2015-2016 Yahoo! Inc. + Copyright 2015-2016 Imply Data, Inc. + + ------------------------------------------------------------------------------- + + This product contains a modified version of Andrew Duffy's java-alphanum library + * LICENSE: + * https://github.com/amjjd/java-alphanum/blob/5c036e2e492cc7f3b7bcdebd46b8f9e2a87927e5/LICENSE.txt (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/amjjd/java-alphanum + + This product contains conjunctive normal form conversion code and a variance aggregator algorithm adapted from Apache Hive + * LICENSE: + * https://github.com/apache/hive/blob/branch-2.0/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/apache/hive + + This product contains variable length long deserialization code adapted from Apache Lucene + * LICENSE: + * https://github.com/apache/lucene-solr/blob/master/lucene/LICENSE.txt (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/apache/lucene-solr + + This product contains a modified version of Metamarkets java-util library + * LICENSE: + * https://github.com/metamx/java-util/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/metamx/java-util + * COMMIT TAG: + * https://github.com/metamx/java-util/commit/826021f + + This product contains a modified version of TestNG 6.8.7 + * LICENSE: + * http://testng.org/license/ (Apache License, Version 2.0) + * HOMEPAGE: + * http://testng.org/ + + This product contains a modified version of Metamarkets bytebuffer-collections library + * LICENSE: + * https://github.com/metamx/bytebuffer-collections/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/metamx/bytebuffer-collections + * COMMIT TAG: + * https://github.com/metamx/bytebuffer-collections/commit/3d1e7c8 + + This product contains SQL query planning code adapted from Apache Calcite + * LICENSE: + * https://github.com/apache/calcite/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://calcite.apache.org/ + + This product contains a modified version of Metamarkets extendedset library + * LICENSE: + * https://github.com/metamx/extendedset/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/metamx/extendedset + * COMMIT TAG: + * https://github.com/metamx/extendedset/commit/c9d647d + + This product contains a modified version of Alessandro Colantonio's CONCISE + (COmpressed 'N' Composable Integer SEt) library, extending the functionality of + ConciseSet to use IntBuffers. + * (c) 2010 Alessandro Colantonio + * + * + * LICENSE: + * Apache License, Version 2.0 + * HOMEPAGE: + * https://sourceforge.net/projects/concise/ + + This product contains a modified version of The Guava Authors's Closer class from Guava library: + * LICENSE: + * https://github.com/google/guava/blob/c462d69329709f72a17a64cb229d15e76e72199c/COPYING (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/google/guava + * COMMIT TAG: + * https://github.com/google/guava/blob/c462d69329709f72a17a64cb229d15e76e72199c + + This product contains code adapted from Apache Hadoop + * LICENSE: + * https://github.com/apache/hadoop/blob/trunk/LICENSE.txt (Apache License, Version 2.0) + * HOMEPAGE: + * http://hadoop.apache.org/ + + (ASLv2) Emitter + The following NOTICE information applies: + emitter + Copyright 2012-2015 Metamarkets Group Inc. + + (ASLv2) Finagle + The following NOTICE information applies: + Copyright 2017, Twitter Inc. + This software contains portions of code from other open-source projects, including: + >>>>>>>>>>>>>> + Apache Thrift + Copyright 2006-2010 The Apache Software Foundation. + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) java-util + The following NOTICE information applies: + java-util + Copyright 2011-2017 Metamarkets Group Inc. + + (ASLv2) jdbi + The following NOTICE information applies: + This product includes software developed by Brian McCallister + + (ASLv2) Joda Convert + The following NOTICE information applies: + Joda Convert + Copyright 2010-present Stephen Colebourne + + This product includes software developed by + Joda.org (http://www.joda.org/). + + + Joda-Convert includes code from Google Guava, which is licensed as follows: + + Copyright (C) 2011 The Guava Authors + + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + in compliance with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + or implied. See the License for the specific language governing permissions and limitations under + the License. + + (ASLv2) loglady + The following NOTICE information applies: + Copyright (c) 2013 Daniel Lundin + + (ASLv2) scala_util + The following NOTICE information applies: + Copyright 2012 Metamarkets Group Inc. + + (ASLv2) server-metrics + The following NOTICE information applies: + server-metrics + Copyright 2011-2015 Metamarkets Group Inc. + ************************ Common Development and Distribution License 1.1 @@ -1663,6 +1817,7 @@ Mozilla Public License v2.0 The following binary components are provided under the Mozilla Public License v2.0. See project link for details. (MPL 2.0) Saxon HE (net.sf.saxon:Saxon-HE:jar:9.6.0-5 - http://www.saxonica.com/) + (MPL 2.0) Rhino (org.mozilla:rhino:jar:1.7R5 - https://developer.mozilla.org/en-US/docs/Mozilla/Projects/Rhino/Download_Rhino) ***************** Mozilla Public License v1.1 diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 6063d3b21c..145c17c839 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,12 +46,14 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; import org.apache.nifi.serialization.record.type.RecordDataType; public class DataTypeUtils { - // Regexes for parsing Floting-Point numbers + // Regexes for parsing Floating-Point numbers private static final String OptionalSign = "[\\-\\+]?"; private static final String Infinity = "(Infinity)"; private static final String NotANumber = "(NaN)"; @@ -326,6 +329,87 @@ public class DataTypeUtils { throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Map for field " + fieldName); } + /** + * Creates a native Java object from a given object of a specified type. Non-scalar (complex, nested, etc.) data types are processed iteratively/recursively, such that all + * included objects are native Java objects, rather than Record API objects or implementation-specific objects. + * @param value The object to be converted + * @param dataType The type of the provided object + * @return An object representing a native Java conversion of the given input object + */ + public static Object convertRecordFieldtoObject(final Object value, final DataType dataType) { + + if (value == null) { + return null; + } + + if (value instanceof Record) { + Record record = (Record) value; + RecordSchema recordSchema = record.getSchema(); + if (recordSchema == null) { + throw new IllegalTypeConversionException("Cannot convert value of type Record to Map because Record does not have an associated Schema"); + } + final Map recordMap = new HashMap<>(); + for (RecordField field : recordSchema.getFields()) { + final DataType fieldDataType = field.getDataType(); + final String fieldName = field.getFieldName(); + Object fieldValue = record.getValue(fieldName); + if (fieldValue == null) { + recordMap.put(fieldName, null); + } else if (isScalarValue(fieldDataType, fieldValue)) { + recordMap.put(fieldName, fieldValue); + + } else if (fieldDataType instanceof RecordDataType) { + Record nestedRecord = (Record) fieldValue; + recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, fieldDataType)); + + } else if (fieldDataType instanceof MapDataType) { + recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType)fieldDataType).getValueType())); + + } else if (fieldDataType instanceof ArrayDataType) { + recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[])fieldValue, ((ArrayDataType) fieldDataType).getElementType())); + } else { + throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + fieldDataType.toString() + + " to Map for field " + fieldName + " because the type is not supported"); + } + } + return recordMap; + } else if (value instanceof Map) { + return convertRecordMapToJavaMap((Map)value, ((MapDataType)dataType).getValueType()); + } else if (dataType != null && isScalarValue(dataType, value)) { + return value; + } + + throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported"); + } + + + public static Map convertRecordMapToJavaMap(final Map map, DataType valueDataType) { + + if (map == null) { + return null; + } + + Map resultMap = new LinkedHashMap<>(); + for (Map.Entry entry : map.entrySet()) { + resultMap.put(entry.getKey(), convertRecordFieldtoObject(entry.getValue(), valueDataType)); + } + return resultMap; + } + + public static Object[] convertRecordArrayToJavaArray(final Object[] array, DataType elementDataType) { + + if (array == null || array.length == 0 || isScalarValue(elementDataType, array[0])) { + return array; + } else { + // Must be an array of complex types, build an array of converted values + Object[] resultArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + resultArray[i] = convertRecordFieldtoObject(array[i], elementDataType); + } + return resultArray; + } + } + public static boolean isMapTypeCompatible(final Object value) { return value != null && value instanceof Map; } diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index 6666651af8..a239ea7550 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -17,11 +17,21 @@ package org.apache.nifi.serialization.record; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; -import org.junit.Assert; import org.junit.Test; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestDataTypeUtils { /** @@ -33,12 +43,126 @@ public class TestDataTypeUtils { java.util.Date date = new java.util.Date(); Timestamp ts = DataTypeUtils.toTimestamp(date, null, null); - Assert.assertNotNull(ts); - Assert.assertEquals("Times didn't match", ts.getTime(), date.getTime()); + assertNotNull(ts); + assertEquals("Times didn't match", ts.getTime(), date.getTime()); java.sql.Date sDate = new java.sql.Date(date.getTime()); ts = DataTypeUtils.toTimestamp(date, null, null); - Assert.assertNotNull(ts); - Assert.assertEquals("Times didn't match", ts.getTime(), sDate.getTime()); + assertNotNull(ts); + assertEquals("Times didn't match", ts.getTime(), sDate.getTime()); + } + + @Test + public void testConvertRecordMapToJavaMap() { + assertNull(DataTypeUtils.convertRecordMapToJavaMap(null, null)); + assertNull(DataTypeUtils.convertRecordMapToJavaMap(null, RecordFieldType.MAP.getDataType())); + Map resultMap = DataTypeUtils.convertRecordMapToJavaMap(new HashMap<>(), RecordFieldType.MAP.getDataType()); + assertNotNull(resultMap); + assertTrue(resultMap.isEmpty()); + + int[] intArray = {3,2,1}; + + Map inputMap = new HashMap() {{ + put("field1", "hello"); + put("field2", 1); + put("field3", intArray); + }}; + + resultMap = DataTypeUtils.convertRecordMapToJavaMap(inputMap, RecordFieldType.STRING.getDataType()); + assertNotNull(resultMap); + assertFalse(resultMap.isEmpty()); + assertEquals("hello", resultMap.get("field1")); + assertEquals(1, resultMap.get("field2")); + assertTrue(resultMap.get("field3") instanceof int[]); + assertNull(resultMap.get("field4")); + + } + + @Test + public void testConvertRecordArrayToJavaArray() { + assertNull(DataTypeUtils.convertRecordArrayToJavaArray(null, null)); + assertNull(DataTypeUtils.convertRecordArrayToJavaArray(null, RecordFieldType.STRING.getDataType())); + String[] stringArray = {"Hello", "World!"}; + Object[] resultArray = DataTypeUtils.convertRecordArrayToJavaArray(stringArray, RecordFieldType.STRING.getDataType()); + assertNotNull(resultArray); + for(Object o : resultArray) { + assertTrue(o instanceof String); + } + } + + @Test + public void testConvertRecordFieldToObject() { + assertNull(DataTypeUtils.convertRecordFieldtoObject(null, null)); + assertNull(DataTypeUtils.convertRecordFieldtoObject(null, RecordFieldType.MAP.getDataType())); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello")); + fields.add(new RecordField("noDefault", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType()))); + fields.add(new RecordField("intField", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("intArray", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); + + // Map of Records with Arrays + List nestedRecordFields = new ArrayList<>(); + nestedRecordFields.add(new RecordField("a", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); + nestedRecordFields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); + RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields); + + fields.add(new RecordField("complex", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(nestedRecordSchema)))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map values = new HashMap<>(); + values.put("noDefault", "world"); + values.put("intField", 5); + values.put("intArray", new Integer[] {3,2,1}); + final Map complexValues = new HashMap<>(); + + final Map complexValueRecord1 = new HashMap<>(); + complexValueRecord1.put("a",new Integer[] {3,2,1}); + complexValueRecord1.put("b",new Integer[] {5,4,3}); + + final Map complexValueRecord2 = new HashMap<>(); + complexValueRecord2.put("a",new String[] {"hello","world!"}); + complexValueRecord2.put("b",new String[] {"5","4","3"}); + + complexValues.put("complex1", DataTypeUtils.toRecord(complexValueRecord1, nestedRecordSchema, "complex1")); + complexValues.put("complex2", DataTypeUtils.toRecord(complexValueRecord2, nestedRecordSchema, "complex2")); + + values.put("complex", complexValues); + final Record inputRecord = new MapRecord(schema, values); + + Object o = DataTypeUtils.convertRecordFieldtoObject(inputRecord, RecordFieldType.RECORD.getRecordDataType(schema)); + assertTrue(o instanceof Map); + Map outputMap = (Map) o; + assertEquals("hello", outputMap.get("defaultOfHello")); + assertEquals("world", outputMap.get("noDefault")); + o = outputMap.get("intField"); + assertEquals(5,o); + o = outputMap.get("intArray"); + assertTrue(o instanceof Integer[]); + Integer[] intArray = (Integer[])o; + assertEquals(3, intArray.length); + assertEquals((Integer)3, intArray[0]); + o = outputMap.get("complex"); + assertTrue(o instanceof Map); + Map nestedOutputMap = (Map)o; + o = nestedOutputMap.get("complex1"); + assertTrue(o instanceof Map); + Map complex1 = (Map)o; + o = complex1.get("a"); + assertTrue(o instanceof Integer[]); + assertEquals((Integer)2, ((Integer[])o)[1]); + o = complex1.get("b"); + assertTrue(o instanceof Integer[]); + assertEquals((Integer)3, ((Integer[])o)[2]); + o = nestedOutputMap.get("complex2"); + assertTrue(o instanceof Map); + Map complex2 = (Map)o; + o = complex2.get("a"); + assertTrue(o instanceof String[]); + assertEquals("hello", ((String[])o)[0]); + o = complex2.get("b"); + assertTrue(o instanceof String[]); + assertEquals("4", ((String[])o)[1]); + } } diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml index 6f4f606426..bc400fbc6c 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml @@ -26,6 +26,11 @@ nar + + org.apache.nifi + nifi-standard-services-api-nar + nar + org.apache.nifi nifi-druid-controller-service-api diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..9efc25327c --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,329 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +The binary distribution of this product bundles 'ANTLR 4' which is available + under a "3-clause BSD" license. For details see http://www.antlr.org/license.html + + Copyright (c) 2012 Terence Parr and Sam Harwell + All rights reserved. + Redistribution and use in source and binary forms, with or without modification, are permitted + provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + Redistributions in binary form must reproduce the above copyright notice, this list of + conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + + Neither the name of the author nor the names of its contributors may be used to endorse + or promote products derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'ParaNamer' + which is available under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'icu4j' + which is available under the ICU License. + + COPYRIGHT AND PERMISSION NOTICE + + Copyright (c) 1995-2011 International Business Machines Corporation and others + + All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy of this software + and associated documentation files (the "Software"), to deal in the Software without restriction, + including without limitation the rights to use, copy, modify, merge, publish, distribute, and/or + sell copies of the Software, and to permit persons to whom the Software is furnished to do so, + provided that the above copyright notice(s) and this permission notice appear in all copies of + the Software and that both the above copyright notice(s) and this permission notice appear in + supporting documentation. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT OF THIRD PARTY RIGHTS. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR HOLDERS INCLUDED + IN THIS NOTICE BE LIABLE FOR ANY CLAIM, OR ANY SPECIAL INDIRECT OR CONSEQUENTIAL DAMAGES, OR ANY + DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, + NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE + OF THIS SOFTWARE. + + Except as contained in this notice, the name of a copyright holder shall not be used in advertising + or otherwise to promote the sale, use or other dealings in this Software without prior written + authorization of the copyright holder. + +The binary distribution of this product bundles 'Scala Library, Scala Parser, Scala Reflect' which is available + under a "3-clause BSD" license. For details see https://github.com/scala/scala/blob/v2.11.7/doc/LICENSE.md) + + This software includes projects with other licenses -- see `doc/LICENSE.md`. + + Copyright (c) 2002-2017 EPFL + Copyright (c) 2011-2017 Lightbend, Inc. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + * Neither the name of the EPFL nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..1ac342e748 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,612 @@ +nifi-druid-controller-service-api-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache HttpComponents Client + The following NOTICE information applies: + Apache HttpComponents Client + Copyright 1999-2016 The Apache Software Foundation + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) bytebuffer-collections + The following NOTICE information applies: + bytebuffer-collections + Copyright 2011-2015 Metamarkets Group Inc. + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Apache Commons Collections + The following NOTICE information applies: + Apache Commons Collections + Copyright 2001-2013 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + Apache Commons Logging + Copyright 2003-2013 The Apache Software Foundation + + (ASLv2) Apache Commons DBCP + The following NOTICE information applies: + Apache Commons DBCP + Copyright 2001-2015 The Apache Software Foundation. + + (ASLv2) Apache Commons Pool + The following NOTICE information applies: + Apache Commons Pool + Copyright 1999-2009 The Apache Software Foundation. + + (ASLv2) Apache Curator + The following NOTICE information applies: + Curator Framework + Copyright 2011-2014 The Apache Software Foundation + + Curator Client + Copyright 2011-2014 The Apache Software Foundation + + Curator Recipes + Copyright 2011-2014 The Apache Software Foundation + + Curator X-Discovery + Copyright 2013-2014 The Apache Software Foundation + + (ASLv2) Druid + The following NOTICE information applies: + Druid - a distributed column store. + Copyright 2012-2016 Metamarkets Group Inc. + Copyright 2015-2016 Yahoo! Inc. + Copyright 2015-2016 Imply Data, Inc. + + ------------------------------------------------------------------------------- + + This product contains a modified version of Andrew Duffy's java-alphanum library + * LICENSE: + * https://github.com/amjjd/java-alphanum/blob/5c036e2e492cc7f3b7bcdebd46b8f9e2a87927e5/LICENSE.txt (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/amjjd/java-alphanum + + This product contains conjunctive normal form conversion code and a variance aggregator algorithm adapted from Apache Hive + * LICENSE: + * https://github.com/apache/hive/blob/branch-2.0/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/apache/hive + + This product contains variable length long deserialization code adapted from Apache Lucene + * LICENSE: + * https://github.com/apache/lucene-solr/blob/master/lucene/LICENSE.txt (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/apache/lucene-solr + + This product contains a modified version of Metamarkets java-util library + * LICENSE: + * https://github.com/metamx/java-util/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/metamx/java-util + * COMMIT TAG: + * https://github.com/metamx/java-util/commit/826021f + + This product contains a modified version of TestNG 6.8.7 + * LICENSE: + * http://testng.org/license/ (Apache License, Version 2.0) + * HOMEPAGE: + * http://testng.org/ + + This product contains a modified version of Metamarkets bytebuffer-collections library + * LICENSE: + * https://github.com/metamx/bytebuffer-collections/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/metamx/bytebuffer-collections + * COMMIT TAG: + * https://github.com/metamx/bytebuffer-collections/commit/3d1e7c8 + + This product contains SQL query planning code adapted from Apache Calcite + * LICENSE: + * https://github.com/apache/calcite/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://calcite.apache.org/ + + This product contains a modified version of Metamarkets extendedset library + * LICENSE: + * https://github.com/metamx/extendedset/blob/master/LICENSE (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/metamx/extendedset + * COMMIT TAG: + * https://github.com/metamx/extendedset/commit/c9d647d + + This product contains a modified version of Alessandro Colantonio's CONCISE + (COmpressed 'N' Composable Integer SEt) library, extending the functionality of + ConciseSet to use IntBuffers. + * (c) 2010 Alessandro Colantonio + * + * + * LICENSE: + * Apache License, Version 2.0 + * HOMEPAGE: + * https://sourceforge.net/projects/concise/ + + This product contains a modified version of The Guava Authors's Closer class from Guava library: + * LICENSE: + * https://github.com/google/guava/blob/c462d69329709f72a17a64cb229d15e76e72199c/COPYING (Apache License, Version 2.0) + * HOMEPAGE: + * https://github.com/google/guava + * COMMIT TAG: + * https://github.com/google/guava/blob/c462d69329709f72a17a64cb229d15e76e72199c + + This product contains code adapted from Apache Hadoop + * LICENSE: + * https://github.com/apache/hadoop/blob/trunk/LICENSE.txt (Apache License, Version 2.0) + * HOMEPAGE: + * http://hadoop.apache.org/ + + (ASLv2) Emitter + The following NOTICE information applies: + emitter + Copyright 2012-2015 Metamarkets Group Inc. + + (ASLv2) Finagle + The following NOTICE information applies: + Copyright 2017, Twitter Inc. + This software contains portions of code from other open-source projects, including: + >>>>>>>>>>>>>> + Apache Thrift + Copyright 2006-2010 The Apache Software Foundation. + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Guava + The following NOTICE information applies: + Guava + Copyright 2015 The Guava Authors + + (ASLv2) Google Guice + The following NOTICE information applies: + Google Guice - Core Library + Copyright 2006-2011 Google, Inc. + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) java-util + The following NOTICE information applies: + java-util + Copyright 2011-2017 Metamarkets Group Inc. + + (ASLv2) jdbi + The following NOTICE information applies: + This product includes software developed by Brian McCallister + + (ASLv2) Joda Time + The following NOTICE information applies: + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) Joda Convert + The following NOTICE information applies: + Joda Convert + Copyright 2010-present Stephen Colebourne + + This product includes software developed by + Joda.org (http://www.joda.org/). + + + Joda-Convert includes code from Google Guava, which is licensed as follows: + + Copyright (C) 2011 The Guava Authors + + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + in compliance with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + or implied. See the License for the specific language governing permissions and limitations under + the License. + + (ASLv2) Apache log4j + The following NOTICE information applies: + Apache log4j + Copyright 2007 The Apache Software Foundation + + (ASLv2) loglady + The following NOTICE information applies: + Copyright (c) 2013 Daniel Lundin + + (ASLv2) The Netty Project + The following NOTICE information applies: + + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * http://netty.io/ + + Copyright 2014 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE..txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified portion of 'Webbit', an event based + WebSocket and HTTP server, which can be obtained at: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product contains a modified portion of 'SLF4J', a simple logging + facade for Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * http://www.slf4j.org/ + + This product contains a modified portion of 'Apache Harmony', an open source + Java SE, which can be obtained at: + + * LICENSE: + * license/LICENSE.harmony.txt (Apache License 2.0) + * HOMEPAGE: + * http://archive.apache.org/dist/harmony/ + + This product contains a modified portion of 'jbzip2', a Java bzip2 compression + and decompression library written by Matthew J. Francis. It can be obtained at: + + * LICENSE: + * license/LICENSE.jbzip2.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jbzip2/ + + This product contains a modified portion of 'libdivsufsort', a C API library to construct + the suffix array and the Burrows-Wheeler transformed string for any input string of + a constant-size alphabet written by Yuta Mori. It can be obtained at: + + * LICENSE: + * license/LICENSE.libdivsufsort.txt (MIT License) + * HOMEPAGE: + * https://github.com/y-256/libdivsufsort + + This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jctools.txt (ASL2 License) + * HOMEPAGE: + * https://github.com/JCTools/JCTools + + This product optionally depends on 'JZlib', a re-implementation of zlib in + pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product optionally depends on 'Compress-LZF', a Java library for encoding and + decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: + + * LICENSE: + * license/LICENSE.compress-lzf.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/ning/compress + + This product optionally depends on 'lz4', a LZ4 Java compression + and decompression library written by Adrien Grand. It can be obtained at: + + * LICENSE: + * license/LICENSE.lz4.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jpountz/lz4-java + + This product optionally depends on 'lzma-java', a LZMA Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.lzma-java.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jponge/lzma-java + + This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression + and decompression library written by William Kinney. It can be obtained at: + + * LICENSE: + * license/LICENSE.jfastlz.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jfastlz/ + + This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/protobuf + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * http://www.bouncycastle.org/ + + This product optionally depends on 'Snappy', a compression library produced + by Google Inc, which can be obtained at: + + * LICENSE: + * license/LICENSE.snappy.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/snappy + + This product optionally depends on 'JBoss Marshalling', an alternative Java + serialization API, which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1) + * HOMEPAGE: + * http://www.jboss.org/jbossmarshalling + + This product optionally depends on 'Caliper', Google's micro- + benchmarking framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.caliper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/google/caliper + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * http://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, which + can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * http://logging.apache.org/log4j/ + + This product optionally depends on 'Aalto XML', an ultra-high performance + non-blocking XML processor, which can be obtained at: + + * LICENSE: + * license/LICENSE.aalto-xml.txt (Apache License 2.0) + * HOMEPAGE: + * http://wiki.fasterxml.com/AaltoHome + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at: + + * LICENSE: + * license/LICENSE.hpack.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/twitter/hpack + + This product contains a modified portion of 'Apache Commons Lang', a Java library + provides utilities for the java.lang API, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-lang.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/proper/commons-lang/ + + This product contains a forked and modified version of Tomcat Native + + * LICENSE: + * ASL2 + * HOMEPAGE: + * http://tomcat.apache.org/native-doc/ + * https://svn.apache.org/repos/asf/tomcat/native/ + + (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3) + + (ASLv2) scala_util + The following NOTICE information applies: + Copyright 2012 Metamarkets Group Inc. + + (ASLv2) server-metrics + The following NOTICE information applies: + server-metrics + Copyright 2011-2015 Metamarkets Group Inc. + + (ASLv2) Apache ZooKeeper + The following NOTICE information applies: + Apache ZooKeeper + Copyright 2009-2012 The Apache Software Foundation + +************************ +Eclipse Public License 1.0 +************************ + +The following binary components are provided under the Eclipse Public License 1.0. See project link for details. + + (EPL 1.0)(MPL 2.0) H2 Database (com.h2database:h2:jar:1.3.176 - http://www.h2database.com/html/license.html) + + +************************ +Mozilla Public License 2.0 +************************ + +The following binary components are provided under the Mozilla Public License 2.0. See project link for details. + + (MPL 2.0) Rhino (org.mozilla:rhino:jar:1.7R5 - https://developer.mozilla.org/en-US/docs/Mozilla/Projects/Rhino/Download_Rhino) + + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) jersey-client (org.glassfish.jersey.core:jersey-client:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-common (org.glassfish.jersey.core:jersey-common:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-container-servlet-core (org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-entity-filtering (org.glassfish.jersey.ext:jersey-entity-filtering:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-hk2 (org.glassfish.jersey.inject:jersey-hk2:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-media-jaxb (org.glassfish.jersey.media:jersey-media-jaxb:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-media-json-jackson (org.glassfish.jersey.media:jersey-media-json-jackson:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-media-multipart (org.glassfish.jersey.media:jersey-media-multipart:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-server (org.glassfish.jersey.core:jersey-server:jar:2.26 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) hk2 (org.glassfish.hk2:hk2:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) hk2-api (org.glassfish.hk2:hk2-api:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) hk2-utils (org.glassfish.hk2:hk2-utils:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) hk2-locator (org.glassfish.hk2:hk2-locator:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) hk2-config (org.glassfish.hk2:hk2-config:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) hk2-core (org.glassfish.hk2:hk2-core:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) hk2-runlevel (org.glassfish.hk2:hk2-runlevel:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) config-types (org.glassfish.hk2:config-types:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) class-model (org.glassfish.hk2:class-model:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) asm-all-repackaged (org.glassfish.hk2.external:asm-all-repackaged:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) aopalliance-repackaged (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.5.0-b42 - https://javaee.github.io/glassfish/) + (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject) + (CDDL 1.1) (GPL2 w/ CPE) jersey-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.19 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-client (org.glassfish.jersey.core:jersey-client:jar:2.22.1 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-common (org.glassfish.jersey.core:jersey-common:jar:2.22.1 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-media-multipart (org.glassfish.jersey.media:jersey-media-multipart:jar:2.22.1 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.22.1 - https://jersey.github.io/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.9 - https://jersey.java.net/jersey-client/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.9 - https://jersey.java.net/jersey-core/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.9 - https://jersey.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.9 - https://jersey.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-guice (com.sun.jersey.contribs:jersey-guice:jar:1.9 - https://jersey.java.net/) + + + (CDDL 1.1) (GPL2 w/ CPE) Old JAXB Runtime (com.sun.xml.bind:jaxb-impl:jar:2.2.3-1 - http://jaxb.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) MIME Streaming Extension (org.jvnet.mimepull:mimepull:jar:1.9.3 - http://mimepull.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail) + (CDDL 1.1) (GPL2 w/ CPE) JSP Implementation (org.glassfish.web:javax.servlet.jsp:jar:2.3.2 - http://jsp.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JavaServer Pages (TM) TagLib Implementation (org.glassfish.web:javax.servlet.jsp.jstl:jar:1.2.2 - http://jstl.java.net) + (CDDL 1.1) (GPL2 w/ CPE) Expression Language 3.0 (org.glassfish:javax.el:jar:3.0.0 - http://el-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JavaServer Pages(TM) API (javax.servlet.jsp:javax.servlet.jsp-api:jar:2.3.1 - http://jsp.java.net) + (CDDL 1.1) (GPL2 w/ CPE) Expression Language 3.0 API (javax.el:javax.el-api:jar:3.0.0 - http://uel-spec.java.net) + +***************** +Public Domain +***************** + +The following binary components are provided to the 'Public Domain'. See project link for details. + + (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/) + +The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details. + + (CC0v1.0) JSR166e for Twitter (com.twitter:jsr166e:jar:1.1.0 - https://github.com/twitter/jsr166e) diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml index 97eac38587..7883e3a0a7 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml @@ -13,72 +13,125 @@ limitations under the License. --> - - 4.0.0 - - org.apache.nifi - nifi-druid-bundle - 1.5.0-SNAPSHOT - - - nifi-druid-controller-service-api - - + + 4.0.0 + + org.apache.nifi + nifi-druid-bundle + 1.5.0-SNAPSHOT + + + nifi-druid-controller-service-api + jar + + org.apache.nifi nifi-api provided - - io.druid - tranquility-core_2.10 - 0.8.2 - - - io.druid - druid-console - - - io.druid - druid-aws-common - - - com.twitter - finagle-core_2.10 - - - c3p0 - c3p0 - - - io.tesla.aether - tesla-aether - - - org.glassfish - javax.el - - - com.sun.jersey - * - - - org.eclipse.jetty - * - - - org.eclipse.aether - aether-api - - - com.maxmind.geoip2 - geoip2 - - - net.java.dev.jets3t - jets3t - - - - + + io.druid + tranquility-core_2.11 + + + io.druid + druid-console + + + io.druid + druid-aws-common + + + c3p0 + c3p0 + + + io.tesla.aether + tesla-aether + + + org.glassfish + javax.el + + + com.sun.jersey + * + + + org.eclipse.jetty + * + + + org.eclipse.aether + aether-api + + + com.maxmind.geoip2 + geoip2 + + + net.java.dev.jets3t + jets3t + + + io.druid + druid-server + + + mysql + mysql-connector-java + + + com.google.code.findbugs + jsr305 + + + org.bouncycastle + bcprov-jdk15on + + + + + io.druid + druid-server + ${druid.version} + + + * + * + + + + + io.druid + druid-processing + ${druid.version} + + + net.java.dev.jets3t + jets3t + + + io.airlift + airline + + + com.google.code.findbugs + jsr305 + + + + + com.github.stephenc.findbugs + findbugs-annotations + 1.3.9-1 + + + org.bouncycastle + bcprov-jdk15on + provided + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/druid/DruidTranquilityService.java similarity index 87% rename from nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java rename to nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/druid/DruidTranquilityService.java index 705303f4ed..ca83207fa2 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/druid/DruidTranquilityService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.nifi.controller.api; +package org.apache.nifi.controller.api.druid; import java.util.Map; @@ -23,6 +23,8 @@ import org.apache.nifi.controller.ControllerService; import com.metamx.tranquility.tranquilizer.Tranquilizer; -public interface DruidTranquilityService extends ControllerService{ - Tranquilizer> getTranquilizer(); +public interface DruidTranquilityService extends ControllerService { + Tranquilizer> getTranquilizer(); + + String getTransitUri(); } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml index 27149692fc..d852fe1a12 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml @@ -35,61 +35,22 @@ org.apache.nifi nifi-utils - - org.apache.nifi - nifi-processor-utils - org.apache.nifi nifi-druid-controller-service-api 1.5.0-SNAPSHOT + provided io.druid - tranquility-core_2.10 - 0.8.2 - - - io.druid - druid-console - - - io.druid - druid-aws-common - - - c3p0 - c3p0 - - - io.tesla.aether - tesla-aether - - - org.glassfish - javax.el - - - com.sun.jersey - * - - - org.eclipse.jetty - * - - - org.eclipse.aether - aether-api - - - com.maxmind.geoip2 - geoip2 - - - net.java.dev.jets3t - jets3t - - + tranquility-core_2.11 + provided + + + org.apache.nifi + nifi-mock + 1.5.0-SNAPSHOT + test \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java similarity index 61% rename from nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java rename to nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java index 6d4ee1939c..e5af8ea847 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java @@ -15,31 +15,38 @@ * limitations under the License. */ -package org.apache.nifi.controller; +package org.apache.nifi.controller.druid; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.api.DruidTranquilityService; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.api.druid.DruidTranquilityService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.reporting.InitializationException; import org.codehaus.jackson.map.ObjectMapper; -import org.joda.time.DateTime; -import org.joda.time.Period; import com.metamx.common.Granularity; import com.metamx.tranquility.beam.Beam; @@ -63,37 +70,74 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongMaxAggregatorFactory; import io.druid.query.aggregation.LongMinAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import org.joda.time.DateTime; +import org.joda.time.Period; + @Tags({"Druid", "Timeseries", "OLAP", "ingest"}) @CapabilityDescription("Asynchronously sends flowfiles to Druid Indexing Task using Tranquility API. " + "If aggregation and roll-up of data is required, an Aggregator JSON descriptor needs to be provided." + "Details on how describe aggregation using JSON can be found at: http://druid.io/docs/latest/querying/aggregations.html") public class DruidTranquilityController extends AbstractControllerService implements DruidTranquilityService { - private String firehosePattern = "druid:firehose:%s"; - private int clusterPartitions = 1; - private int clusterReplication = 1; - private String indexRetryPeriod = "PT10M"; + private final static String FIREHOSE_PATTERN = "druid:firehose:%s"; + + private final static AllowableValue PT1M = new AllowableValue("PT1M", "1 minute", "1 minute"); + private final static AllowableValue PT10M = new AllowableValue("PT10M", "10 minutes", "10 minutes"); + private final static AllowableValue PT60M = new AllowableValue("PT60M", "60 minutes", "1 hour"); + + private final static List TIME_ORDINALS = Arrays.asList("SECOND", "MINUTE", "FIVE_MINUTE", "TEN_MINUTE", "FIFTEEN_MINUTE", "HOUR", "SIX_HOUR", "DAY", "WEEK", "MONTH", "YEAR"); private Tranquilizer tranquilizer = null; + private String transitUri = ""; public static final PropertyDescriptor DATASOURCE = new PropertyDescriptor.Builder() .name("druid-cs-data-source") .displayName("Druid Data Source") - .description("Druid Data Source") //TODO description, example + .description("A data source is the Druid equivalent of a database table.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) .expressionLanguageSupported(true) .build(); - public static final PropertyDescriptor CONNECT_STRING = new PropertyDescriptor.Builder() + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() .name("druid-cs-zk-connect-string") .displayName("Zookeeper Connection String") - .description("ZK Connect String for Druid") //TODO example + .description("A comma-separated list of host:port pairs, each corresponding to a ZooKeeper server. Ex: localhost:2181") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor ZOOKEEPER_RETRY_BASE_SLEEP_TIME = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-base-sleep") + .displayName("Zookeeper Retry Base Sleep Time") + .description("When a connection to Zookeeper needs to be retried, this property specifies the amount of time (in milliseconds) to wait at first before retrying.") + .required(true) + .defaultValue("1000") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_MAX_RETRIES = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-max-retries") + .displayName("Zookeeper Retry Max Retries") + .description("When a connection to Zookeeper needs to be retried, this property specifies how many times to attempt reconnection.") + .required(true) + .defaultValue("20") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_SLEEP_TIME = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-sleep") + .displayName("Zookeeper Retry Sleep Time") + .description("When a connection to Zookeeper needs to be retried, this property specifies the amount of time to sleep (in milliseconds) between retries.") + .required(true) + .defaultValue("30000") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new PropertyDescriptor.Builder() .name("druid-cs-index-service-path") .displayName("Index Service Path") @@ -114,10 +158,30 @@ public class DruidTranquilityController extends AbstractControllerService implem .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor CLUSTER_PARTITIONS = new PropertyDescriptor.Builder() + .name("druid-cs-cluster-partitions") + .displayName("Cluster Partitions") + .description("The number of partitions in the Druid cluster.") + .required(true) + .defaultValue("1") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_REPLICATION = new PropertyDescriptor.Builder() + .name("druid-cs-cluster-replication") + .displayName("Cluster Replication") + .description("The replication factor for the Druid cluster.") + .required(true) + .defaultValue("1") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder() .name("druid-cs-timestamp-field") .displayName("Timestamp field") - .description("The name of the field that will be used as the timestamp. Should be in ISO format.") + .description("The name of the field that will be used as the timestamp. Should be in ISO8601 format.") .required(true) .defaultValue("timestamp") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -151,7 +215,17 @@ public class DruidTranquilityController extends AbstractControllerService implem + "}" + "]") .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator((subject, value, context) -> { // Non-empty and valid JSON validator + if (value == null || value.isEmpty()) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject + " cannot be empty").build(); + } + try { + DruidTranquilityController.parseJsonString(value); + return new ValidationResult.Builder().subject(subject).input(value).valid(true).build(); + } catch (IllegalArgumentException iae) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject + " is not valid Aggregator JSON").build(); + } + }) .expressionLanguageSupported(true) .build(); @@ -167,20 +241,30 @@ public class DruidTranquilityController extends AbstractControllerService implem public static final PropertyDescriptor SEGMENT_GRANULARITY = new PropertyDescriptor.Builder() .name("druid-cs-segment-granularity") .displayName("Segment Granularity") - .description("Time unit by which to group and aggregate/rollup events.") + .description("Time unit by which to group and aggregate/rollup events. The value must be at least as large as the value of Query Granularity.") .required(true) - .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR", "Use druid.segment.granularity variable") - .defaultValue("MINUTE") + .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR") + .defaultValue("TEN_MINUTE") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor QUERY_GRANULARITY = new PropertyDescriptor.Builder() .name("druid-cs-query-granularity") .displayName("Query Granularity") - .description("Time unit by which to group and aggregate/rollup events. The value must be at least as large as the value of Segment Granularity.") + .description("Time unit by which to group and aggregate/rollup events. The value must be less than or equal to the value of Segment Granularity.") .required(true) - .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR", "Use druid.query.granularity variable") - .defaultValue("TEN_MINUTE") + .allowableValues("NONE", "SECOND", "MINUTE", "FIFTEEN_MINUTE", "THIRTY_MINUTE", "HOUR", "DAY", "MONTH", "YEAR") + .defaultValue("MINUTE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX_RETRY_PERIOD = new PropertyDescriptor.Builder() + .name("druid-cs-index-retry-period") + .displayName("Index Retry Period") + .description("Grace period to allow late arriving events for real time ingest.") + .required(true) + .allowableValues(PT1M, PT10M, PT60M) + .defaultValue(PT10M.getValue()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -189,8 +273,8 @@ public class DruidTranquilityController extends AbstractControllerService implem .displayName("Late Event Grace Period") .description("Grace period to allow late arriving events for real time ingest.") .required(true) - .allowableValues("PT1M", "PT10M", "PT60M")// TODO possibly friendly name - .defaultValue("PT10M") + .allowableValues(PT1M, PT10M, PT60M) + .defaultValue(PT10M.getValue()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -201,6 +285,7 @@ public class DruidTranquilityController extends AbstractControllerService implem .required(true) .defaultValue("2000") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_PENDING_BATCHES = new PropertyDescriptor.Builder() @@ -210,6 +295,7 @@ public class DruidTranquilityController extends AbstractControllerService implem .required(true) .defaultValue("5") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor LINGER_MILLIS = new PropertyDescriptor.Builder() @@ -221,16 +307,27 @@ public class DruidTranquilityController extends AbstractControllerService implem .required(true) .defaultValue("1000") .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); private static final List properties; + private volatile CuratorFramework curator; + private volatile int zkBaseSleepMillis; + private volatile int zkMaxRetries; + private volatile int zkSleepMillis; + static { final List props = new ArrayList<>(); props.add(DATASOURCE); - props.add(CONNECT_STRING); + props.add(ZOOKEEPER_CONNECTION_STRING); + props.add(ZOOKEEPER_RETRY_BASE_SLEEP_TIME); + props.add(ZOOKEEPER_RETRY_MAX_RETRIES); + props.add(ZOOKEEPER_RETRY_SLEEP_TIME); props.add(DRUID_INDEX_SERVICE_PATH); props.add(DRUID_DISCOVERY_PATH); + props.add(CLUSTER_PARTITIONS); + props.add(CLUSTER_REPLICATION); props.add(DIMENSIONS_LIST); props.add(AGGREGATOR_JSON); props.add(SEGMENT_GRANULARITY); @@ -249,24 +346,47 @@ public class DruidTranquilityController extends AbstractControllerService implem return properties; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + Set results = new HashSet<>(); + final String segmentGranularity = validationContext.getProperty(SEGMENT_GRANULARITY).getValue(); + final String queryGranularity = validationContext.getProperty(QUERY_GRANULARITY).getValue(); + + // Verify that segment granularity is as least as large as query granularity + if (TIME_ORDINALS.indexOf(segmentGranularity) < TIME_ORDINALS.indexOf(queryGranularity)) { + results.add(new ValidationResult.Builder().valid(false).explanation( + "Segment Granularity must be at least as large as Query Granularity").build()); + } + + return results; + } + @OnEnabled - public void onConfigured(final ConfigurationContext context) throws InitializationException { + public void onConfigured(final ConfigurationContext context) { ComponentLog log = getLogger(); log.info("Starting Druid Tranquility Controller Service..."); - final String dataSource = context.getProperty(DATASOURCE).getValue(); - final String zkConnectString = context.getProperty(CONNECT_STRING).getValue(); - final String indexService = context.getProperty(DRUID_INDEX_SERVICE_PATH).getValue(); - final String discoveryPath = context.getProperty(DRUID_DISCOVERY_PATH).getValue(); - final String timestampField = context.getProperty(TIMESTAMP_FIELD).getValue(); + final String dataSource = context.getProperty(DATASOURCE).evaluateAttributeExpressions().getValue(); + final String zkConnectString = context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue(); + zkBaseSleepMillis = context.getProperty(ZOOKEEPER_RETRY_BASE_SLEEP_TIME).evaluateAttributeExpressions().asInteger(); + zkMaxRetries = context.getProperty(ZOOKEEPER_RETRY_BASE_SLEEP_TIME).evaluateAttributeExpressions().asInteger(); + zkSleepMillis = context.getProperty(ZOOKEEPER_RETRY_SLEEP_TIME).evaluateAttributeExpressions().asInteger(); + final String indexService = context.getProperty(DRUID_INDEX_SERVICE_PATH).evaluateAttributeExpressions().getValue(); + final String discoveryPath = context.getProperty(DRUID_DISCOVERY_PATH).evaluateAttributeExpressions().getValue(); + final int clusterPartitions = context.getProperty(CLUSTER_PARTITIONS).evaluateAttributeExpressions().asInteger(); + final int clusterReplication = context.getProperty(CLUSTER_REPLICATION).evaluateAttributeExpressions().asInteger(); + final String timestampField = context.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions().getValue(); final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue(); final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue(); final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue(); - final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).getValue(); + final String indexRetryPeriod = context.getProperty(INDEX_RETRY_PERIOD).getValue(); + final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).evaluateAttributeExpressions().getValue(); final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).getValue(); - final int maxBatchSize = Integer.valueOf(context.getProperty(MAX_BATCH_SIZE).getValue()); - final int maxPendingBatches = Integer.valueOf(context.getProperty(MAX_PENDING_BATCHES).getValue()); - final int lingerMillis = Integer.valueOf(context.getProperty(LINGER_MILLIS).getValue()); + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final int maxPendingBatches = context.getProperty(MAX_PENDING_BATCHES).evaluateAttributeExpressions().asInteger(); + final int lingerMillis = context.getProperty(LINGER_MILLIS).evaluateAttributeExpressions().asInteger(); + + transitUri = String.format(FIREHOSE_PATTERN, dataSource) + ";indexServicePath=" + indexService; final List dimensions = getDimensions(dimensionsStringList); final List aggregator = getAggregatorList(aggregatorJSON); @@ -282,36 +402,50 @@ public class DruidTranquilityController extends AbstractControllerService implem Iterator aggIterator = aggregator.iterator(); AggregatorFactory currFactory; - log.debug("Number of Aggregations Defined: " + aggregator.size()); + log.debug("Number of Aggregations Defined: {}", new Object[]{aggregator.size()}); while (aggIterator.hasNext()) { currFactory = aggIterator.next(); - log.debug("Verifying Aggregator Definition"); - log.debug("Aggregator Name: " + currFactory.getName()); - log.debug("Aggregator Type: " + currFactory.getTypeName()); - log.debug("Aggregator Req Fields: " + currFactory.requiredFields()); + log.debug("Verifying Aggregator Definition\n\tAggregator Name: {}\n\tAggregator Type: {}\n\tAggregator Req Fields: {}", + new Object[]{currFactory.getName(), currFactory.getTypeName(), currFactory.requiredFields()}); } // Tranquility uses ZooKeeper (through Curator) for coordination. - final CuratorFramework curator = CuratorFrameworkFactory - .builder() - .connectString(zkConnectString) - .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) // TODO expose as properties, maybe fibonacci backoff - .build(); + curator = getCurator(zkConnectString); curator.start(); // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default, // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp. final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null); - final Beam> beam = DruidBeams.builder(timestamper) + final Beam> beam = buildBeam(dataSource, indexService, discoveryPath, clusterPartitions, clusterReplication, + segmentGranularity, queryGranularity, windowPeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec); + + tranquilizer = buildTranquilizer(maxBatchSize, maxPendingBatches, lingerMillis, beam); + + tranquilizer.start(); + } + + Tranquilizer> buildTranquilizer(int maxBatchSize, int maxPendingBatches, int lingerMillis, Beam> beam) { + return Tranquilizer.builder() + .maxBatchSize(maxBatchSize) + .maxPendingBatches(maxPendingBatches) + .lingerMillis(lingerMillis) + .blockOnFull(true) + .build(beam); + } + + Beam> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication, + String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List dimensions, + List aggregator, Timestamper> timestamper, TimestampSpec timestampSpec) { + return DruidBeams.builder(timestamper) .curator(curator) .discoveryPath(discoveryPath) - .location(DruidLocation.create(DruidEnvironment.create(indexService, firehosePattern), dataSource)) + .location(DruidLocation.create(DruidEnvironment.create(indexService, FIREHOSE_PATTERN), dataSource)) .timestampSpec(timestampSpec) .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularity.fromString(queryGranularity))) .tuning( ClusteredBeamTuning .builder() - .segmentGranularity(getSegmentGranularity(segmentGranularity)) + .segmentGranularity(getGranularity(segmentGranularity)) .windowPeriod(new Period(windowPeriod)) .partitions(clusterPartitions) .replicants(clusterReplication) @@ -323,24 +457,46 @@ public class DruidTranquilityController extends AbstractControllerService implem .indexRetryPeriod(new Period(indexRetryPeriod)) .build()) .buildBeam(); + } - tranquilizer = Tranquilizer.builder() - .maxBatchSize(maxBatchSize) - .maxPendingBatches(maxPendingBatches) - .lingerMillis(lingerMillis) - .blockOnFull(true) - .build(beam); + @OnDisabled + public void onDisabled() { + if (tranquilizer != null) { + tranquilizer.flush(); + tranquilizer.stop(); + tranquilizer = null; + } - tranquilizer.start(); + if (curator != null) { + curator.close(); + curator = null; + } } public Tranquilizer getTranquilizer() { return tranquilizer; } - private List> parseJsonString(String aggregatorJson) { + CuratorFramework getCurator(String zkConnectString) { + return CuratorFrameworkFactory + .builder() + .connectString(zkConnectString) + .retryPolicy(new ExponentialBackoffRetry(zkBaseSleepMillis, zkMaxRetries, zkSleepMillis)) + .build(); + } + + @Override + public String getTransitUri() { + return transitUri; + } + + @SuppressWarnings("unchecked") + private static List> parseJsonString(String aggregatorJson) { + if (aggregatorJson == null) { + return Collections.EMPTY_LIST; + } ObjectMapper mapper = new ObjectMapper(); - List> aggSpecList = null; + final List> aggSpecList; try { aggSpecList = mapper.readValue(aggregatorJson, List.class); return aggSpecList; @@ -350,7 +506,13 @@ public class DruidTranquilityController extends AbstractControllerService implem } private List getDimensions(String dimensionStringList) { - List dimensionList = new LinkedList(Arrays.asList(dimensionStringList.split(","))); + List dimensionList = new ArrayList<>(); + if (dimensionStringList != null) { + Arrays.stream(dimensionStringList.split(",")) + .filter(StringUtils::isNotBlank) + .map(String::trim) + .forEach(dimensionList::add); + } return dimensionList; } @@ -407,10 +569,10 @@ public class DruidTranquilityController extends AbstractControllerService implem return new CountAggregatorFactory(map.get("name")); } - private Granularity getSegmentGranularity(String segmentGranularity) { + private Granularity getGranularity(String granularityString) { Granularity granularity = Granularity.HOUR; - switch (segmentGranularity) { + switch (granularityString) { case "SECOND": granularity = Granularity.SECOND; break; diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 53d6d0626d..201af175e9 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.controller.DruidTranquilityController \ No newline at end of file +org.apache.nifi.controller.druid.DruidTranquilityController \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java deleted file mode 100644 index 95d5e9dd35..0000000000 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - - -public class DruidTranquilityControllerTest { - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java new file mode 100644 index 0000000000..56c2616132 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.druid; + + +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import org.apache.curator.framework.CuratorFramework; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class DruidTranquilityControllerTest { + + private TestRunner runner; + private MockDruidTranquilityController service; + + @Before + public void setup() throws Exception { + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + service = new MockDruidTranquilityController(); + runner.addControllerService("Client Service", service); + } + + @Test + public void testValid() { + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.DATASOURCE, "test"); + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": \"count\"}]"); + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2"); + runner.assertValid(service); + } + + public static class MockDruidTranquilityController extends DruidTranquilityController { + + Tranquilizer t = mock(Tranquilizer.class); + CuratorFramework c = mock(CuratorFramework.class); + + @Override + public Tranquilizer getTranquilizer() { + return t; + } + + @Override + CuratorFramework getCurator(String zkConnectString) { + return c; + } + } + + public static class TestControllerServiceProcessor extends AbstractProcessor { + + static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Client Service") + .description("DruidTranquilityService") + .identifiesControllerService(DruidTranquilityService.class) + .required(true) + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + List propertyDescriptors = new ArrayList<>(); + propertyDescriptors.add(CLIENT_SERVICE); + return propertyDescriptors; + } + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml index 746a0779d1..6a384c610b 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml @@ -25,6 +25,11 @@ nar + + org.apache.nifi + nifi-druid-controller-service-api-nar + nar + org.apache.nifi nifi-druid-controller-service diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..f3c8ecef26 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,209 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..b8c958ccad --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +nifi-druid-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml index 6ce35f687a..f596c6f877 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml @@ -27,69 +27,48 @@ org.apache.nifi nifi-api - 1.3.0 + provided org.apache.nifi nifi-utils - 1.3.0 - - - org.apache.nifi - nifi-processor-utils - 1.3.0 + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-record-serialization-service-api + org.apache.nifi nifi-druid-controller-service-api 1.5.0-SNAPSHOT + provided io.druid - tranquility-core_2.10 - 0.8.2 - - - io.druid - druid-console - - - io.druid - druid-aws-common - - - c3p0 - c3p0 - - - io.tesla.aether - tesla-aether - - - org.glassfish - javax.el - - - com.sun.jersey - * - - - org.eclipse.jetty - * - - - org.eclipse.aether - aether-api - - - com.maxmind.geoip2 - geoip2 - - - net.java.dev.jets3t - jets3t - - + tranquility-core_2.11 + provided + + org.apache.nifi + nifi-mock + 1.5.0-SNAPSHOT + test + + + org.apache.nifi + nifi-mock-record-utils + 1.5.0-SNAPSHOT + test + + + org.apache.nifi + nifi-druid-controller-service + 1.5.0-SNAPSHOT + test + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java deleted file mode 100644 index 3134d5ae1c..0000000000 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.stream.io.StreamUtils; - -import org.codehaus.jackson.map.ObjectMapper; - -import org.apache.nifi.controller.api.DruidTranquilityService; -import com.metamx.tranquility.tranquilizer.MessageDroppedException; -import com.metamx.tranquility.tranquilizer.Tranquilizer; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; - -import scala.runtime.BoxedUnit; - -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@Tags({"Druid", "Timeseries", "OLAP", "ingest"}) -@CapabilityDescription("Sends events to Apache Druid for Indexing. " - + "Leverages Druid Tranquility Controller service." - + "Incoming flow files are expected to contain 1 or many JSON objects, one JSON object per line") -public class PutDruid extends AbstractSessionFactoryProcessor { - - private List properties; - private Set relationships; - private final Map messageStatus = new HashMap<>(); - - public static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder() - .name("putdruid-tranquility-service") - .displayName("Tranquility Service") - .description("Tranquility Service to use for sending events to Druid") - .required(true) - .identifiesControllerService(DruidTranquilityService.class) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Success relationship") - .build(); - - public static final Relationship REL_FAIL = new Relationship.Builder() - .name("failure") - .description("FlowFiles are routed to this relationship when they cannot be parsed") - .build(); - - public static final Relationship REL_DROPPED = new Relationship.Builder() - .name("dropped") - .description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...") - .build(); - - public void init(final ProcessorInitializationContext context) { - List properties = new ArrayList<>(); - properties.add(DRUID_TRANQUILITY_SERVICE); - this.properties = Collections.unmodifiableList(properties); - - Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_DROPPED); - relationships.add(REL_FAIL); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set getRelationships() { - return relationships; - } - - @Override - protected List getSupportedPropertyDescriptors() { - return properties; - } - - //Method breaks down incoming flow file and sends it to Druid Indexing service - private void processFlowFile(ProcessContext context, ProcessSession session) { - final ComponentLog log = getLogger(); - - //Get handle on Druid Tranquility session - DruidTranquilityService tranquilityController = context.getProperty(DRUID_TRANQUILITY_SERVICE) - .asControllerService(DruidTranquilityService.class); - Tranquilizer> tranquilizer = tranquilityController.getTranquilizer(); - - final FlowFile flowFile = session.get(); - if (flowFile == null || flowFile.getSize() == 0) { - return; - } - - //Get data from flow file body - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); - - - String contentString = new String(buffer, StandardCharsets.UTF_8); - Map contentMap = null; - - //Create payload array from flow file content, one element per line - String[] messageArray = contentString.split("\\R"); - - //Convert each array element from JSON to HashMap and send to Druid - for (String message : messageArray) { - try { - contentMap = new ObjectMapper().readValue(message, HashMap.class); - } catch (IOException e1) { - log.error("Error parsing incoming message array in the flowfile body"); - } - - log.debug("Tranquilizer Status: " + tranquilizer.status().toString()); - messageStatus.put(flowFile, "pending"); - //Send data element to Druid, Asynch - Future future = tranquilizer.send(contentMap); - log.debug(" Sent Payload to Druid: " + contentMap); - - //Wait for Druid to call back with status - future.addEventListener(new FutureEventListener() { - @Override - public void onFailure(Throwable cause) { - if (cause instanceof MessageDroppedException) { - //This happens when event timestamp targets a Druid Indexing task that has closed (Late Arriving Data) - log.error(" FlowFile Dropped due to MessageDroppedException: " + cause.getMessage() + " : " + cause); - cause.getStackTrace(); - log.error(" Transferring FlowFile to DROPPED relationship"); - session.transfer(flowFile, REL_DROPPED); - } else { - log.error("FlowFile Processing Failed due to: {} : {}", new Object[]{cause.getMessage(), cause}); - cause.printStackTrace(); - log.error(" Transferring FlowFile to FAIL relationship"); - session.transfer(flowFile, REL_FAIL); - } - } - - @Override - public void onSuccess(Object value) { - log.debug(" FlowFile Processing Success : " + value.toString()); - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().send(flowFile, "Druid Tranquility Service"); - } - }); - - try { - //Wait for result from Druid - //This method will be asynch since this is a SessionFactoryProcessor and OnTrigger will create a new Thread - Await.result(future); - } catch (Exception e) { - getLogger().error("Caught exception while waiting for result of put request: " + e.getMessage()); - } - } - //session.transfer(flowFile, REL_SUCCESS); - session.commit(); - } - - public void onTrigger(ProcessContext context, ProcessSessionFactory factory) throws ProcessException { - final ProcessSession session = factory.createSession(); - //Create new Thread to ensure that waiting for callback does not reduce throughput - new Thread(() -> { - processFlowFile(context, session); - }).start(); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java new file mode 100644 index 0000000000..19e47a9c35 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.druid; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; + +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import scala.runtime.BoxedUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"druid", "timeseries", "olap", "ingest", "put", "record"}) +@CapabilityDescription("Sends records to Druid for Indexing. Leverages Druid Tranquility Controller service.") +@WritesAttribute(attribute = "record.count", description = "The number of messages that were sent to Druid for this FlowFile. FlowFiles on the success relationship will have a value " + + "of this attribute that indicates the number of records successfully processed by Druid, and the FlowFile content will be only the successful records. This behavior applies " + + "to the failure and dropped relationships as well.") +public class PutDruidRecord extends AbstractSessionFactoryProcessor { + + static final String RECORD_COUNT = "record.count"; + + private List properties; + private Set relationships; + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("putdruid-record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("putdruid-record-writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data to outgoing relationships.") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder() + .name("putdruid-tranquility-service") + .displayName("Tranquility Service") + .description("Tranquility Service to use for sending events to Druid.") + .required(true) + .identifiesControllerService(DruidTranquilityService.class) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship when they are successfully processed by Druid") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship when they cannot be parsed or otherwise processed by Druid") + .build(); + + static final Relationship REL_DROPPED = new Relationship.Builder() + .name("dropped") + .description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...") + .build(); + + + public void init(final ProcessorInitializationContext context) { + List properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(RECORD_WRITER_FACTORY); + properties.add(DRUID_TRANQUILITY_SERVICE); + this.properties = Collections.unmodifiableList(properties); + + Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_DROPPED); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + /** + * Parses the record(s), converts each to a Map, and sends via Tranquility to the Druid Indexing Service + * + * @param context The process context + * @param session The process session + */ + @SuppressWarnings("unchecked") + private void processFlowFile(ProcessContext context, final ProcessSession session) { + final ComponentLog log = getLogger(); + + // Get handle on Druid Tranquility session + DruidTranquilityService tranquilityController = context.getProperty(DRUID_TRANQUILITY_SERVICE) + .asControllerService(DruidTranquilityService.class); + Tranquilizer> tranquilizer = tranquilityController.getTranquilizer(); + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + // Create the outgoing flow files and output streams + FlowFile droppedFlowFile = session.create(flowFile); + final AtomicInteger droppedFlowFileCount = new AtomicInteger(0); + FlowFile failedFlowFile = session.create(flowFile); + final AtomicInteger failedFlowFileCount = new AtomicInteger(0); + FlowFile successfulFlowFile = session.create(flowFile); + final AtomicInteger successfulFlowFileCount = new AtomicInteger(0); + + final AtomicInteger recordWriteErrors = new AtomicInteger(0); + + int recordCount = 0; + final OutputStream droppedOutputStream = session.write(droppedFlowFile); + final RecordSetWriter droppedRecordWriter; + final OutputStream failedOutputStream = session.write(failedFlowFile); + final RecordSetWriter failedRecordWriter; + final OutputStream successfulOutputStream = session.write(successfulFlowFile); + final RecordSetWriter successfulRecordWriter; + try (final InputStream in = session.read(flowFile)) { + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + + final Map attributes = flowFile.getAttributes(); + + final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger()); + final RecordSchema outSchema = writerFactory.getSchema(attributes, reader.getSchema()); + droppedRecordWriter = writerFactory.createWriter(log, outSchema, droppedOutputStream); + droppedRecordWriter.beginRecordSet(); + failedRecordWriter = writerFactory.createWriter(log, outSchema, failedOutputStream); + failedRecordWriter.beginRecordSet(); + successfulRecordWriter = writerFactory.createWriter(log, outSchema, successfulOutputStream); + successfulRecordWriter.beginRecordSet(); + + Record r; + while ((r = reader.nextRecord()) != null) { + final Record record = r; + recordCount++; + // Convert each Record to HashMap and send to Druid + Map contentMap = (Map) DataTypeUtils.convertRecordFieldtoObject(r, RecordFieldType.RECORD.getRecordDataType(r.getSchema())); + + log.debug("Tranquilizer Status: {}", new Object[]{tranquilizer.status().toString()}); + // Send data element to Druid asynchronously + Future future = tranquilizer.send(contentMap); + log.debug("Sent Payload to Druid: {}", new Object[]{contentMap}); + + // Wait for Druid to call back with status + future.addEventListener(new FutureEventListener() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof MessageDroppedException) { + // This happens when event timestamp targets a Druid Indexing task that has closed (Late Arriving Data) + log.debug("Record Dropped due to MessageDroppedException: {}, transferring record to dropped.", new Object[]{cause.getMessage()}, cause); + try { + synchronized (droppedRecordWriter) { + droppedRecordWriter.write(record); + droppedRecordWriter.flush(); + droppedFlowFileCount.incrementAndGet(); + } + } catch (final IOException ioe) { + log.error("Error transferring record to dropped, this may result in data loss.", new Object[]{ioe.getMessage()}, ioe); + recordWriteErrors.incrementAndGet(); + } + + } else { + log.error("FlowFile Processing Failed due to: {}", new Object[]{cause.getMessage()}, cause); + try { + synchronized (failedRecordWriter) { + failedRecordWriter.write(record); + failedRecordWriter.flush(); + failedFlowFileCount.incrementAndGet(); + } + } catch (final IOException ioe) { + log.error("Error transferring record to failure, this may result in data loss.", new Object[]{ioe.getMessage()}, ioe); + recordWriteErrors.incrementAndGet(); + } + } + } + + @Override + public void onSuccess(Object value) { + log.debug(" FlowFile Processing Success: {}", new Object[]{value.toString()}); + try { + synchronized (successfulRecordWriter) { + successfulRecordWriter.write(record); + successfulRecordWriter.flush(); + successfulFlowFileCount.incrementAndGet(); + } + } catch (final IOException ioe) { + log.error("Error transferring record to success, this may result in data loss. " + + "However the record was successfully processed by Druid", new Object[]{ioe.getMessage()}, ioe); + recordWriteErrors.incrementAndGet(); + } + } + }); + + } + + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + log.error("FlowFile Processing Failed due to: {}", new Object[]{e.getMessage()}, e); + // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles() + flowFile = session.putAttribute(flowFile, RECORD_COUNT, Integer.toString(recordCount)); + session.transfer(flowFile, REL_FAILURE); + try { + droppedOutputStream.close(); + session.remove(droppedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with dropped records.", ioe); + } + try { + failedOutputStream.close(); + session.remove(failedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with failed records.", ioe); + } + try { + successfulOutputStream.close(); + session.remove(successfulFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with successful records.", ioe); + } + session.commit(); + return; + } + + if (recordCount == 0) { + // Send original (empty) flow file to success, remove the rest + flowFile = session.putAttribute(flowFile, RECORD_COUNT, "0"); + session.transfer(flowFile, REL_SUCCESS); + try { + droppedOutputStream.close(); + session.remove(droppedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with dropped records.", ioe); + } + try { + failedOutputStream.close(); + session.remove(failedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with failed records.", ioe); + } + try { + successfulOutputStream.close(); + session.remove(successfulFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with successful records.", ioe); + } + } else { + + // Wait for all the records to finish processing + while (recordCount != (droppedFlowFileCount.get() + failedFlowFileCount.get() + successfulFlowFileCount.get() + recordWriteErrors.get())) { + Thread.yield(); + } + + // Send partitioned flow files out to their relationships (or remove them if empty) + + try { + droppedRecordWriter.finishRecordSet(); + droppedRecordWriter.close(); + } catch (IOException ioe) { + log.error("Error closing FlowFile with dropped records: {}", new Object[]{ioe.getMessage()}, ioe); + session.rollback(); + throw new ProcessException(ioe); + } + if (droppedFlowFileCount.get() > 0) { + droppedFlowFile = session.putAttribute(droppedFlowFile, RECORD_COUNT, Integer.toString(droppedFlowFileCount.get())); + session.transfer(droppedFlowFile, REL_DROPPED); + } else { + session.remove(droppedFlowFile); + } + + try { + failedRecordWriter.finishRecordSet(); + failedRecordWriter.close(); + } catch (IOException ioe) { + log.error("Error closing FlowFile with failed records: {}", new Object[]{ioe.getMessage()}, ioe); + session.rollback(); + throw new ProcessException(ioe); + } + if (failedFlowFileCount.get() > 0) { + failedFlowFile = session.putAttribute(failedFlowFile, RECORD_COUNT, Integer.toString(failedFlowFileCount.get())); + session.transfer(failedFlowFile, REL_FAILURE); + } else { + session.remove(failedFlowFile); + } + + try { + successfulRecordWriter.finishRecordSet(); + successfulRecordWriter.close(); + } catch (IOException ioe) { + log.error("Error closing FlowFile with successful records: {}", new Object[]{ioe.getMessage()}, ioe); + session.rollback(); + throw new ProcessException(ioe); + } + if (successfulFlowFileCount.get() > 0) { + successfulFlowFile = session.putAttribute(successfulFlowFile, RECORD_COUNT, Integer.toString(successfulFlowFileCount.get())); + session.transfer(successfulFlowFile, REL_SUCCESS); + session.getProvenanceReporter().send(successfulFlowFile, tranquilityController.getTransitUri()); + } else { + session.remove(successfulFlowFile); + } + + session.remove(flowFile); + } + + session.commit(); + } + + public void onTrigger(ProcessContext context, ProcessSessionFactory factory) throws ProcessException { + final ProcessSession session = factory.createSession(); + processFlowFile(context, session); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 4136d5e5a6..25ef74799d 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.PutDruid \ No newline at end of file +org.apache.nifi.processors.druid.PutDruidRecord \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java new file mode 100644 index 0000000000..695212c98b --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.druid; + +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.metamx.tranquility.typeclass.Timestamper; +import com.twitter.finagle.Status; +import com.twitter.util.Awaitable; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.TimeoutException; +import com.twitter.util.Try; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.query.aggregation.AggregatorFactory; +import org.apache.curator.framework.CuratorFramework; +import scala.Function1; +import scala.Option; +import scala.runtime.BoxedUnit; + +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MockDruidTranquilityController extends DruidTranquilityController { + + private final Tranquilizer t; + private final CuratorFramework cf; + private int numCalls = 0; + + public MockDruidTranquilityController() { + this(-1, -1); + } + + /** + * Creates a mock/stub Druid controller for testing. The failAfterN parameter must be higher than the dropAfterN parameter in order for messages to be dropped. + * + * @param dropAfterN The number of records after which to start calling the "dropped" callback, namely onFailure(MessageDroppedException) + * @param failAfterN The number of records after which to start calling the "failure" callback, namely onFailure(Exception) + */ + public MockDruidTranquilityController(final int dropAfterN, final int failAfterN) { + t = mock(Tranquilizer.class); + final Future future = new Future() { + + FutureEventListener listener; + + @Override + public Future addEventListener(FutureEventListener listener) { + this.listener = listener; + numCalls++; + if (dropAfterN >= 0 && numCalls > failAfterN) { + listener.onFailure(new Exception()); + } else if (dropAfterN >= 0 && numCalls > dropAfterN) { + listener.onFailure(MessageDroppedException.Instance()); + } else { + listener.onSuccess(BoxedUnit.UNIT); + } + return this; + } + + @Override + public Awaitable ready(Duration timeout, CanAwait permit) throws InterruptedException, TimeoutException { + return null; + } + + @Override + public BoxedUnit result(Duration timeout, CanAwait permit) throws Exception { + return null; + } + + @Override + public boolean isReady(CanAwait permit) { + return true; + } + + @Override + public Future respond(Function1, BoxedUnit> k) { + return null; + } + + @Override + public Option> poll() { + return null; + } + + @Override + public void raise(Throwable interrupt) { + + } + + @Override + public Future transform(Function1, Future> f) { + return null; + } + }; + when(t.send(anyObject())).thenReturn(future); + when(t.status()).thenReturn(new Status() { + }); + cf = mock(CuratorFramework.class); + } + + @Override + public Tranquilizer getTranquilizer() { + return t; + } + + @Override + CuratorFramework getCurator(String zkConnectString) { + return cf; + } + + @SuppressWarnings("unchecked") + @Override + Tranquilizer> buildTranquilizer(int maxBatchSize, int maxPendingBatches, int lingerMillis, Beam> beam) { + return t; + } + + @SuppressWarnings("unchecked") + @Override + Beam> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication, + String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List dimensions, + List aggregator, Timestamper> timestamper, TimestampSpec timestampSpec) { + return mock(Beam.class); + } + + @Override + public String getTransitUri() { + return ""; + } + +} diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java deleted file mode 100644 index 0137624382..0000000000 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors; - - -public class PutDruidTest { - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java new file mode 100644 index 0000000000..28e084c4ef --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.druid; + + +import org.apache.nifi.controller.druid.DruidTranquilityController; +import org.apache.nifi.controller.druid.MockDruidTranquilityController; +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class PutDruidRecordTest { + + private TestRunner runner; + private DruidTranquilityService druidTranquilityController; + private MockRecordParser recordReader; + private MockRecordWriter recordWriter; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(PutDruidRecord.class); + druidTranquilityController = new MockDruidTranquilityController(2,3); + recordReader = new MockRecordParser(); + recordWriter = new MockRecordWriter(null, true, 2); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", recordWriter); + runner.enableControllerService(recordWriter); + + runner.addControllerService("tranquility", druidTranquilityController); + runner.setProperty(druidTranquilityController, DruidTranquilityController.DATASOURCE, "test"); + runner.setProperty(druidTranquilityController, DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); + runner.setProperty(druidTranquilityController, DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": \"count\"}]"); + runner.setProperty(druidTranquilityController, DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2"); + runner.assertValid(druidTranquilityController); + runner.enableControllerService(druidTranquilityController); + + runner.setProperty(PutDruidRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(PutDruidRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(PutDruidRecord.DRUID_TRANQUILITY_SERVICE, "tranquility"); + } + + @Test + public void testEmptyRecord() throws Exception { + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(PutDruidRecord.REL_SUCCESS, 1); + runner.assertTransferCount(PutDruidRecord.REL_FAILURE, 0); + runner.assertTransferCount(PutDruidRecord.REL_DROPPED, 0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "0"); + } + + @Test + public void testPutRecords() throws Exception { + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, "Soccer"); + recordReader.addRecord("Jane Doe", 47, "Tennis"); + recordReader.addRecord("Sally Doe", 47, "Curling"); // Will be dropped due to the "drop after 2" parameter on the MockDruidTranquilityController + recordReader.addRecord("Jimmy Doe", 14, null); // Will fail due to the "fail after 3" parameter on the MockDruidTranquilityController + recordReader.addRecord("Pizza Doe", 14, null); // Will fail due to the "fail after 3" parameter on the MockDruidTranquilityController + recordReader.addRecord("Bad Record", "X", 13); // RecordWriter fail due to the "fail after 2" parameter on the MockRecordWriter, not written to output + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(PutDruidRecord.REL_SUCCESS, 1); + runner.assertTransferCount(PutDruidRecord.REL_FAILURE, 1); + runner.assertTransferCount(PutDruidRecord.REL_DROPPED, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "2"); + flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_DROPPED).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "1"); + flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_FAILURE).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "2"); + + // Assert a single SEND event present for the successful flow file + assertEquals(1, runner.getProvenanceEvents().stream().filter((e) -> ProvenanceEventType.SEND.equals(e.getEventType())).count()); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml index 6b19faf82d..82d3017bfd 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml @@ -27,10 +27,25 @@ pom - UTF-8 - 1.8 - 1.8 + 0.9.1 + 0.8.2 + + + + + io.druid + tranquility-core_2.11 + ${tranquility.version} + + + io.druid + druid-processing + ${druid.version} + + + + nifi-druid-nar nifi-druid-controller-service-api-nar diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 46eb2e3b48..2632a4ae75 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -91,6 +91,7 @@ nifi-metrics-reporting-bundle nifi-spark-bundle nifi-atlas-bundle + nifi-druid-bundle