NIFI-5937 use processor-configured encoding instead of the system default

NIFI-5937 added tests to verify that accented characters are preserved correctly

NIFI-5937 unfolding starred imports

NIFI-5937 unfolding starred imports (now with statics)

Signed-off-by: Ed <edward.berezitsky@gmail.com>

This closes #3250
This commit is contained in:
Alex Savitsky 2019-01-08 10:15:45 -05:00 committed by Ed
parent e6e4175d71
commit 1a443c73ec
2 changed files with 73 additions and 26 deletions

View File

@ -73,6 +73,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.URL; import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -198,6 +199,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
descriptors.add(ID_RECORD_PATH); descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX); descriptors.add(INDEX);
descriptors.add(TYPE); descriptors.add(TYPE);
descriptors.add(CHARSET);
descriptors.add(INDEX_OP); descriptors.add(INDEX_OP);
descriptors.add(SUPPRESS_NULLS); descriptors.add(SUPPRESS_NULLS);
@ -313,6 +315,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue(); final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path); final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
final StringBuilder sb = new StringBuilder(); final StringBuilder sb = new StringBuilder();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
int recordCount = 0; int recordCount = 0;
try (final InputStream in = session.read(flowFile); try (final InputStream in = session.read(flowFile);
@ -345,7 +348,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
writeRecord(record, record.getSchema(), generator); writeRecord(record, record.getSchema(), generator);
generator.flush(); generator.flush();
generator.close(); generator.close();
json.append(out.toString()); json.append(out.toString(charset.name()));
buildBulkCommand(sb, index, docType, indexOp, id, json.toString()); buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
recordCount++; recordCount++;

View File

@ -16,18 +16,15 @@
*/ */
package org.apache.nifi.processors.elasticsearch; package org.apache.nifi.processors.elasticsearch;
import static org.junit.Assert.assertEquals; import com.fasterxml.jackson.databind.ObjectMapper;
import static org.junit.Assert.assertNotNull; import okhttp3.Call;
import static org.junit.Assert.assertTrue; import okhttp3.MediaType;
import static org.mockito.Matchers.any; import okhttp3.OkHttpClient;
import static org.mockito.Mockito.mock; import okhttp3.Protocol;
import static org.mockito.Mockito.when; import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException; import okhttp3.ResponseBody;
import java.net.ConnectException; import okio.Buffer;
import java.util.HashMap;
import java.util.List;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -42,16 +39,21 @@ import org.junit.After;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import okhttp3.Call; import java.io.IOException;
import okhttp3.MediaType; import java.net.ConnectException;
import okhttp3.OkHttpClient; import java.util.HashMap;
import okhttp3.Protocol; import java.util.List;
import okhttp3.Request; import java.util.Map;
import okhttp3.Response; import java.util.function.Consumer;
import okhttp3.ResponseBody;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPutElasticsearchHttpRecord { public class TestPutElasticsearchHttpRecord {
private TestRunner runner; private TestRunner runner;
@After @After
@ -61,7 +63,25 @@ public class TestPutElasticsearchHttpRecord {
@Test @Test
public void testPutElasticSearchOnTriggerIndex() throws IOException { public void testPutElasticSearchOnTriggerIndex() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
processor.setRecordChecks(record -> {
assertEquals(1, record.get("id"));
assertEquals("reç1", record.get("name"));
assertEquals(101, record.get("code"));
}, record -> {
assertEquals(2, record.get("id"));
assertEquals("ræc2", record.get("name"));
assertEquals(102, record.get("code"));
}, record -> {
assertEquals(3, record.get("id"));
assertEquals("rèc3", record.get("name"));
assertEquals(103, record.get("code"));
}, record -> {
assertEquals(4, record.get("id"));
assertEquals("rëc4", record.get("name"));
assertEquals(104, record.get("code"));
});
runner = TestRunners.newTestRunner(processor); // no failures
generateTestData(); generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
@ -368,6 +388,7 @@ public class TestPutElasticsearchHttpRecord {
int statusCode = 200; int statusCode = 200;
String statusMessage = "OK"; String statusMessage = "OK";
String expectedUrl = null; String expectedUrl = null;
Consumer<Map>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) { PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures; this.responseHasFailures = responseHasFailures;
@ -382,6 +403,11 @@ public class TestPutElasticsearchHttpRecord {
expectedUrl = url; expectedUrl = url;
} }
@SafeVarargs
final void setRecordChecks(Consumer<Map>... checks) {
recordChecks = checks;
}
@Override @Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException { protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class); client = mock(OkHttpClient.class);
@ -391,6 +417,24 @@ public class TestPutElasticsearchHttpRecord {
if (statusCode != -1) { if (statusCode != -1) {
Request realRequest = (Request) invocationOnMock.getArguments()[0]; Request realRequest = (Request) invocationOnMock.getArguments()[0];
assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString()))); assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
if (recordChecks != null) {
final ObjectMapper mapper = new ObjectMapper();
Buffer sink = new Buffer();
realRequest.body().writeTo(sink);
String line;
int recordIndex = 0;
boolean content = false;
while ((line = sink.readUtf8Line()) != null) {
if (content) {
content = false;
if (recordIndex < recordChecks.length) {
recordChecks[recordIndex++].accept(mapper.readValue(line, Map.class));
}
} else {
content = true;
}
}
}
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
sb.append(responseHasFailures); sb.append(responseHasFailures);
sb.append("\", \"items\": ["); sb.append("\", \"items\": [");
@ -521,9 +565,9 @@ public class TestPutElasticsearchHttpRecord {
parser.addSchemaField("name", RecordFieldType.STRING); parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT); parser.addSchemaField("code", RecordFieldType.INT);
parser.addRecord(1, "rec1", 101); parser.addRecord(1, "reç1", 101);
parser.addRecord(2, "rec2", 102); parser.addRecord(2, "ræc2", 102);
parser.addRecord(3, "rec3", 103); parser.addRecord(3, "rèc3", 103);
parser.addRecord(4, "rec4", 104); parser.addRecord(4, "rëc4", 104);
} }
} }