NIFI-5266: Sanitize ES parameters in PutElasticsearchHttp processors

Incorporated review comments
Added integration test with JSON character

This closes #2760.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Matthew Burgess 2018-06-04 17:04:01 -04:00 committed by Andy LoPresto
parent 0b0ba1eae3
commit aa33989192
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
7 changed files with 173 additions and 84 deletions

View File

@ -62,8 +62,8 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<artifactId>commons-text</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>

View File

@ -25,6 +25,7 @@ import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.Route;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -75,6 +76,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.displayName("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@ -83,6 +85,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.displayName("Proxy Port")
.description("The port of the proxy server")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
@ -276,4 +279,43 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
final ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(in);
}
protected void buildBulkCommand(StringBuilder sb, String index, String docType, String indexOp, String id, String jsonString) {
if (indexOp.equalsIgnoreCase("index")) {
sb.append("{\"index\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
if (!StringUtils.isEmpty(id)) {
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\"");
}
sb.append("}}\n");
sb.append(jsonString);
sb.append("\n");
} else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
sb.append("{\"update\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\" }\n");
sb.append("{\"doc\": ");
sb.append(jsonString);
sb.append(", \"doc_as_upsert\": ");
sb.append(indexOp.equalsIgnoreCase("upsert"));
sb.append(" }\n");
} else if (indexOp.equalsIgnoreCase("delete")) {
sb.append("{\"delete\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\" }\n");
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
@ -61,8 +62,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@ -143,6 +142,8 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
private static final ObjectMapper mapper = new ObjectMapper();
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
@ -227,7 +228,10 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
final StringBuilder sb = new StringBuilder();
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
final String baseUrl = context.getProperty(ES_URL).evaluateAttributeExpressions().getValue().trim();
if (StringUtils.isEmpty(baseUrl)) {
throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
}
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
// Find the user-added properties and set them as query parameters on the URL
@ -288,42 +292,22 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
session.read(file, in -> {
json.append(IOUtils.toString(in, charset).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '));
});
if (indexOp.equalsIgnoreCase("index")) {
sb.append("{\"index\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\"");
if (!StringUtils.isEmpty(id)) {
sb.append(", \"_id\": \"");
sb.append(id);
sb.append("\"");
}
sb.append("}}\n");
sb.append(json);
sb.append("\n");
} else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
sb.append("{\"update\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\", \"_id\": \"");
sb.append(id);
sb.append("\" }\n");
sb.append("{\"doc\": ");
sb.append(json);
sb.append(", \"doc_as_upsert\": ");
sb.append(indexOp.equalsIgnoreCase("upsert"));
sb.append(" }\n");
} else if (indexOp.equalsIgnoreCase("delete")) {
sb.append("{\"delete\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\", \"_id\": \"");
sb.append(id);
sb.append("\" }\n");
String jsonString = json.toString();
// Ensure the JSON body is well-formed
try {
mapper.readTree(jsonString);
} catch (IOException e) {
logger.error("Flow file content is not valid JSON, penalizing and transferring to failure.",
new Object[]{indexOp, file});
flowFilesToTransfer.remove(file);
file = session.penalize(file);
session.transfer(file, REL_FAILURE);
continue;
}
buildBulkCommand(sb, index, docType, indexOp, id, jsonString);
}
if (!flowFilesToTransfer.isEmpty()) {
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
@ -364,10 +348,10 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
if (!isSuccess(status)) {
if (errorReason == null) {
// Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
String reason = itemNode.findPath("//result").asText();
String reason = itemNode.findPath("result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("//error/reason").asText();
reason = itemNode.findPath("reason").asText();
}
errorReason = reason;
logger.error("Failed to process {} due to {}, transferring to failure",

View File

@ -80,8 +80,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@ -261,7 +259,10 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
OkHttpClient okHttpClient = getClient();
final ComponentLog logger = getLogger();
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
final String baseUrl = context.getProperty(ES_URL).evaluateAttributeExpressions().getValue().trim();
if (StringUtils.isEmpty(baseUrl)) {
throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
}
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
// Find the user-added properties and set them as query parameters on the URL
@ -339,42 +340,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
generator.close();
json.append(out.toString());
if (indexOp.equalsIgnoreCase("index")) {
sb.append("{\"index\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\"");
if (!StringUtils.isEmpty(id)) {
sb.append(", \"_id\": \"");
sb.append(id);
sb.append("\"");
}
sb.append("}}\n");
sb.append(json);
sb.append("\n");
} else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
sb.append("{\"update\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\", \"_id\": \"");
sb.append(id);
sb.append("\" }\n");
sb.append("{\"doc\": ");
sb.append(json);
sb.append(", \"doc_as_upsert\": ");
sb.append(indexOp.equalsIgnoreCase("upsert"));
sb.append(" }\n");
} else if (indexOp.equalsIgnoreCase("delete")) {
sb.append("{\"delete\": { \"_index\": \"");
sb.append(index);
sb.append("\", \"_type\": \"");
sb.append(docType);
sb.append("\", \"_id\": \"");
sb.append(id);
sb.append("\" }\n");
}
buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
}
} catch (IdentifierNotFoundException infe) {
logger.error(infe.getMessage(), new Object[]{flowFile});
@ -422,10 +388,10 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
if (!isSuccess(status)) {
if (errorReason == null) {
// Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
String reason = itemNode.findPath("//result").asText();
String reason = itemNode.findPath("result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("//error/reason").asText();
reason = itemNode.findPath("reason").asText();
}
errorReason = reason;
logger.error("Failed to process {} due to {}, transferring to failure",

View File

@ -89,7 +89,6 @@ public class PutElasticsearchHttpRecordIT {
private void setupPut() {
runner.enqueue("");
runner.setValidateExpressionUsage(false);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_RETRY, 0);
@ -205,6 +204,71 @@ public class PutElasticsearchHttpRecordIT {
});
}
@Test
public void testIllegalIndexName() throws Exception {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
put("sport", null);
}}));
List<Map<String, String>> attrs = new ArrayList<>();
Map<String, String> attr = new HashMap<>();
attr.put("doc_id", "1");
attrs.add(attr);
runner.enqueue("");
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
}
@Test
public void testIndexNameWithJsonChar() throws Exception {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
put("sport", null);
}}));
List<Map<String, String>> attrs = new ArrayList<>();
Map<String, String> attr = new HashMap<>();
attr.put("doc_id", "1");
attrs.add(attr);
runner.enqueue("");
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_RETRY, 0);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
}
@Test
public void testTypeNameWithSpecialChars() throws Exception {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
put("sport", null);
}}));
List<Map<String, String>> attrs = new ArrayList<>();
Map<String, String> attr = new HashMap<>();
attr.put("doc_id", "1");
attrs.add(attr);
setupPut();
}
private interface SharedPostTest {
void run(Map<String, Object> p1, Map<String, Object> p2);
}

View File

@ -553,4 +553,20 @@ public class TestPutElasticsearchHttp {
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
}
@Test(expected = AssertionError.class)
public void testPutElasticSearchBadHostInEL() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
}
}

View File

@ -486,6 +486,23 @@ public class TestPutElasticsearchHttpRecord {
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 100);
}
@Test(expected = AssertionError.class)
public void testPutElasticSearchBadHostInEL() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "1");
}});
runner.run();
}
private void generateTestData() throws IOException {
final MockRecordParser parser = new MockRecordParser();