mirror of
https://github.com/apache/nifi.git
synced 2025-03-03 07:59:15 +00:00
NIFI-5449: Added Base64 Encode/Decode functions to RecordPath
NIFI-5449: Incorporated review comments This closes #2920 Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
parent
57ae9b65a0
commit
5a84d650c3
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.record.path.functions;
|
||||
|
||||
import org.apache.nifi.record.path.FieldValue;
|
||||
import org.apache.nifi.record.path.RecordPathEvaluationContext;
|
||||
import org.apache.nifi.record.path.StandardFieldValue;
|
||||
import org.apache.nifi.record.path.paths.RecordPathSegment;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class Base64Decode extends RecordPathSegment {
|
||||
private final RecordPathSegment recordPath;
|
||||
|
||||
public Base64Decode(final RecordPathSegment recordPath, final boolean absolute) {
|
||||
super("base64Decode", null, absolute);
|
||||
this.recordPath = recordPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
|
||||
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
|
||||
return fieldValues.filter(fv -> fv.getValue() != null)
|
||||
.map(fv -> {
|
||||
|
||||
Object value = fv.getValue();
|
||||
if (value instanceof String) {
|
||||
return new StandardFieldValue(new String(Base64.getDecoder().decode(fv.getValue().toString()), StandardCharsets.UTF_8), fv.getField(), fv.getParent().orElse(null));
|
||||
} else if (value instanceof byte[]) {
|
||||
return new StandardFieldValue(Base64.getDecoder().decode((byte[]) value), fv.getField(), fv.getParent().orElse(null));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Argument supplied to base64Decode must be a String or byte[]");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.record.path.functions;
|
||||
|
||||
import org.apache.nifi.record.path.FieldValue;
|
||||
import org.apache.nifi.record.path.RecordPathEvaluationContext;
|
||||
import org.apache.nifi.record.path.StandardFieldValue;
|
||||
import org.apache.nifi.record.path.paths.RecordPathSegment;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Base64;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class Base64Encode extends RecordPathSegment {
|
||||
private final RecordPathSegment recordPath;
|
||||
|
||||
public Base64Encode(final RecordPathSegment recordPath, final boolean absolute) {
|
||||
super("base64Encode", null, absolute);
|
||||
this.recordPath = recordPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
|
||||
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
|
||||
return fieldValues.filter(fv -> fv.getValue() != null)
|
||||
.map(fv -> {
|
||||
|
||||
Object value = fv.getValue();
|
||||
if (value instanceof String) {
|
||||
try {
|
||||
return new StandardFieldValue(Base64.getEncoder().encodeToString(value.toString().getBytes("UTF-8")), fv.getField(), fv.getParent().orElse(null));
|
||||
} catch (final UnsupportedEncodingException e) {
|
||||
return null; // won't happen.
|
||||
}
|
||||
} else if (value instanceof byte[]) {
|
||||
return new StandardFieldValue(Base64.getEncoder().encode((byte[]) value), fv.getField(), fv.getParent().orElse(null));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Argument supplied to base64Encode must be a String or byte[]");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -64,6 +64,8 @@ import org.apache.nifi.record.path.filter.NotEqualsFilter;
|
||||
import org.apache.nifi.record.path.filter.NotFilter;
|
||||
import org.apache.nifi.record.path.filter.RecordPathFilter;
|
||||
import org.apache.nifi.record.path.filter.StartsWith;
|
||||
import org.apache.nifi.record.path.functions.Base64Decode;
|
||||
import org.apache.nifi.record.path.functions.Base64Encode;
|
||||
import org.apache.nifi.record.path.functions.Concat;
|
||||
import org.apache.nifi.record.path.functions.Format;
|
||||
import org.apache.nifi.record.path.functions.FieldName;
|
||||
@ -264,6 +266,14 @@ public class RecordPathCompiler {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
|
||||
return new Format(args[0], args[1], absolute);
|
||||
}
|
||||
case "base64Encode": {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
||||
return new Base64Encode(args[0], absolute);
|
||||
}
|
||||
case "base64Decode": {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
||||
return new Base64Decode(args[0], absolute);
|
||||
}
|
||||
default: {
|
||||
throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only "
|
||||
+ "be used within a predicate, not as a standalone function");
|
||||
|
@ -27,11 +27,14 @@ import java.sql.Date;
|
||||
import java.text.DateFormat;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.nifi.record.path.exception.RecordPathException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
@ -1279,6 +1282,73 @@ public class TestRecordPath {
|
||||
RecordPath.compile("toBytes(/s, \"NOT A REAL CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBase64Encode() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final List<Object> expectedValues = Arrays.asList(
|
||||
Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)),
|
||||
Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)),
|
||||
Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8))
|
||||
);
|
||||
final Map<String, Object> values = new HashMap<>();
|
||||
values.put("firstName", "John");
|
||||
values.put("lastName", "Doe");
|
||||
values.put("b", "xyz".getBytes(StandardCharsets.UTF_8));
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
assertEquals(Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)),
|
||||
RecordPath.compile("base64Encode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals(Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)),
|
||||
RecordPath.compile("base64Encode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertTrue(Arrays.equals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)),
|
||||
(byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
|
||||
List<Object> actualValues = RecordPath.compile("base64Encode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
|
||||
IntStream.range(0, 3).forEach(i -> {
|
||||
Object expectedObject = expectedValues.get(i);
|
||||
Object actualObject = actualValues.get(i);
|
||||
if (actualObject instanceof String) {
|
||||
assertEquals(expectedObject, actualObject);
|
||||
} else if (actualObject instanceof byte[]) {
|
||||
assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBase64Decode() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final List<Object> expectedValues = Arrays.asList("John", "Doe", "xyz".getBytes(StandardCharsets.UTF_8));
|
||||
final Map<String, Object> values = new HashMap<>();
|
||||
values.put("firstName", Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)));
|
||||
values.put("lastName", Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)));
|
||||
values.put("b", Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)));
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
assertEquals("John", RecordPath.compile("base64Decode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals("Doe", RecordPath.compile("base64Decode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertTrue(Arrays.equals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
|
||||
List<Object> actualValues = RecordPath.compile("base64Decode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
|
||||
IntStream.range(0, 3).forEach(i -> {
|
||||
Object expectedObject = expectedValues.get(i);
|
||||
Object actualObject = actualValues.get(i);
|
||||
if (actualObject instanceof String) {
|
||||
assertEquals(expectedObject, actualObject);
|
||||
} else if (actualObject instanceof byte[]) {
|
||||
assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private List<RecordField> getDefaultFields() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
|
||||
|
@ -627,6 +627,65 @@ The following record path expression would re-format the date String:
|
||||
| `format( toDate(/eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'"), 'yyyy-MM-dd')` | 2017-10-20
|
||||
|==========================================================
|
||||
|
||||
=== base64Encode
|
||||
|
||||
Converts a String or byte[] using Base64 encoding, using the UTF-8 character set. For example, given a schema such as:
|
||||
|
||||
----
|
||||
{
|
||||
"type": "record",
|
||||
"name": "events",
|
||||
"fields": [
|
||||
{ "name": "name", "type": "string" }
|
||||
]
|
||||
}
|
||||
----
|
||||
|
||||
and a record such as:
|
||||
|
||||
----
|
||||
{
|
||||
"name" : "John"
|
||||
}
|
||||
----
|
||||
|
||||
The following record path expression would encode the String using Base64:
|
||||
|
||||
|==========================================================
|
||||
| RecordPath | Return value
|
||||
| `base64Encode(/name)` | Sm9obg==
|
||||
|==========================================================
|
||||
|
||||
=== base64Decode
|
||||
|
||||
Decodes a Base64-encoded String or byte[]. For example, given a schema such as:
|
||||
|
||||
----
|
||||
{
|
||||
"type": "record",
|
||||
"name": "events",
|
||||
"fields": [
|
||||
{ "name": "name", "type": "string" }
|
||||
]
|
||||
}
|
||||
----
|
||||
|
||||
and a record such as:
|
||||
|
||||
----
|
||||
{
|
||||
"name" : "Sm9obg=="
|
||||
}
|
||||
----
|
||||
|
||||
The following record path expression would decode the String using Base64:
|
||||
|
||||
|==========================================================
|
||||
| RecordPath | Return value
|
||||
| `base64Decode(/name)` | John
|
||||
|==========================================================
|
||||
|
||||
|
||||
[[filter_functions]]
|
||||
== Filter Functions
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user