From 1a443c73ecb0ed00233b66d282031aa5b37afb24 Mon Sep 17 00:00:00 2001 From: Alex Savitsky Date: Tue, 8 Jan 2019 10:15:45 -0500 Subject: [PATCH] NIFI-5937 use processor-configured encoding instead of the system default NIFI-5937 added tests to verify that accented characters are preserved correctly NIFI-5937 unfolding starred imports NIFI-5937 unfolding starred imports (now with statics) Signed-off-by: Ed This closes #3250 --- .../PutElasticsearchHttpRecord.java | 5 +- .../TestPutElasticsearchHttpRecord.java | 94 ++++++++++++++----- 2 files changed, 73 insertions(+), 26 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java index ac36604981..52de42442a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java @@ -73,6 +73,7 @@ import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; import java.net.URL; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -198,6 +199,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess descriptors.add(ID_RECORD_PATH); descriptors.add(INDEX); descriptors.add(TYPE); + descriptors.add(CHARSET); descriptors.add(INDEX_OP); descriptors.add(SUPPRESS_NULLS); @@ -313,6 +315,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue(); final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path); final StringBuilder sb = new StringBuilder(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); int recordCount = 0; try (final InputStream in = session.read(flowFile); @@ -345,7 +348,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess writeRecord(record, record.getSchema(), generator); generator.flush(); generator.close(); - json.append(out.toString()); + json.append(out.toString(charset.name())); buildBulkCommand(sb, index, docType, indexOp, id, json.toString()); recordCount++; diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java index 862e177068..2cc16c1aba 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java @@ -16,18 +16,15 @@ */ package org.apache.nifi.processors.elasticsearch; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.net.ConnectException; -import java.util.HashMap; -import java.util.List; - +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okio.Buffer; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -42,16 +39,21 @@ import org.junit.After; import org.junit.Ignore; import org.junit.Test; -import okhttp3.Call; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Protocol; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.ResponseBody; +import java.io.IOException; +import java.net.ConnectException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestPutElasticsearchHttpRecord { - private TestRunner runner; @After @@ -61,7 +63,25 @@ public class TestPutElasticsearchHttpRecord { @Test public void testPutElasticSearchOnTriggerIndex() throws IOException { - runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures + PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false); + processor.setRecordChecks(record -> { + assertEquals(1, record.get("id")); + assertEquals("reç1", record.get("name")); + assertEquals(101, record.get("code")); + }, record -> { + assertEquals(2, record.get("id")); + assertEquals("ræc2", record.get("name")); + assertEquals(102, record.get("code")); + }, record -> { + assertEquals(3, record.get("id")); + assertEquals("rèc3", record.get("name")); + assertEquals(103, record.get("code")); + }, record -> { + assertEquals(4, record.get("id")); + assertEquals("rëc4", record.get("name")); + assertEquals(104, record.get("code")); + }); + runner = TestRunners.newTestRunner(processor); // no failures generateTestData(); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); @@ -368,6 +388,7 @@ public class TestPutElasticsearchHttpRecord { int statusCode = 200; String statusMessage = "OK"; String expectedUrl = null; + Consumer[] recordChecks; PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) { this.responseHasFailures = responseHasFailures; @@ -382,6 +403,11 @@ public class TestPutElasticsearchHttpRecord { expectedUrl = url; } + @SafeVarargs + final void setRecordChecks(Consumer... checks) { + recordChecks = checks; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { client = mock(OkHttpClient.class); @@ -391,6 +417,24 @@ public class TestPutElasticsearchHttpRecord { if (statusCode != -1) { Request realRequest = (Request) invocationOnMock.getArguments()[0]; assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString()))); + if (recordChecks != null) { + final ObjectMapper mapper = new ObjectMapper(); + Buffer sink = new Buffer(); + realRequest.body().writeTo(sink); + String line; + int recordIndex = 0; + boolean content = false; + while ((line = sink.readUtf8Line()) != null) { + if (content) { + content = false; + if (recordIndex < recordChecks.length) { + recordChecks[recordIndex++].accept(mapper.readValue(line, Map.class)); + } + } else { + content = true; + } + } + } StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); sb.append(responseHasFailures); sb.append("\", \"items\": ["); @@ -521,9 +565,9 @@ public class TestPutElasticsearchHttpRecord { parser.addSchemaField("name", RecordFieldType.STRING); parser.addSchemaField("code", RecordFieldType.INT); - parser.addRecord(1, "rec1", 101); - parser.addRecord(2, "rec2", 102); - parser.addRecord(3, "rec3", 103); - parser.addRecord(4, "rec4", 104); + parser.addRecord(1, "reç1", 101); + parser.addRecord(2, "ræc2", 102); + parser.addRecord(3, "rèc3", 103); + parser.addRecord(4, "rëc4", 104); } }