mirror of https://github.com/apache/nifi.git
NIFI-12648 Refactor components in elasticsearch bundle using current API methods
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #8303.
This commit is contained in:
parent
ddc12b94be
commit
5ba20b87c5
|
@ -57,7 +57,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
|||
.displayName("Authorization Scheme")
|
||||
.description("Authorization Scheme used for optional authentication to Elasticsearch.")
|
||||
.allowableValues(AuthorizationScheme.class)
|
||||
.defaultValue(AuthorizationScheme.BASIC.getValue())
|
||||
.defaultValue(AuthorizationScheme.BASIC)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
@ -141,7 +141,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
|||
.displayName("Suppress Null/Empty Values")
|
||||
.description("Specifies how the writer should handle null and empty fields (including objects and arrays)")
|
||||
.allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS)
|
||||
.defaultValue(ALWAYS_SUPPRESS.getValue())
|
||||
.defaultValue(ALWAYS_SUPPRESS)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
|
@ -186,7 +186,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
|||
.displayName("Node Selector")
|
||||
.description("Selects Elasticsearch nodes that can receive requests. Used to keep requests away from dedicated Elasticsearch master nodes")
|
||||
.allowableValues(NODE_SELECTOR_ANY, NODE_SELECTOR_SKIP_DEDICATED_MASTERS)
|
||||
.defaultValue(NODE_SELECTOR_ANY.getValue())
|
||||
.defaultValue(NODE_SELECTOR_ANY)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
|
|
|
@ -104,7 +104,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
|||
|
||||
private ObjectMapper mapper;
|
||||
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
private static final List<PropertyDescriptor> properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY,
|
||||
PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET,
|
||||
SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES,
|
||||
SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
|
||||
|
||||
private RestClient client;
|
||||
|
||||
|
@ -114,13 +117,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
|||
private Charset responseCharset;
|
||||
private ObjectWriter prettyPrintWriter;
|
||||
|
||||
static {
|
||||
properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY,
|
||||
PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET,
|
||||
SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES,
|
||||
SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
|
@ -141,7 +137,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
|||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>(1);
|
||||
|
||||
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(validationContext.getProperty(AUTHORIZATION_SCHEME).getValue());
|
||||
final AuthorizationScheme authorizationScheme = validationContext.getProperty(AUTHORIZATION_SCHEME).asAllowableValue(AuthorizationScheme.class);
|
||||
|
||||
final boolean usernameSet = validationContext.getProperty(USERNAME).isSet();
|
||||
final boolean passwordSet = validationContext.getProperty(PASSWORD).isSet();
|
||||
|
@ -394,8 +390,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
|||
private HttpHost[] getHttpHosts(final ConfigurationContext context) throws MalformedURLException {
|
||||
final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
|
||||
|
||||
final List<String> hostsSplit = Arrays.stream(hosts.split(",\\s*")).map(String::trim).collect(Collectors.toList());
|
||||
this.url = hostsSplit.get(0);
|
||||
final List<String> hostsSplit = Arrays.stream(hosts.split(",\\s*")).map(String::trim).toList();
|
||||
this.url = hostsSplit.getFirst();
|
||||
final List<HttpHost> hh = new ArrayList<>(hostsSplit.size());
|
||||
for (final String host : hostsSplit) {
|
||||
final URL u = URI.create(host).toURL();
|
||||
|
@ -406,7 +402,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
|||
}
|
||||
|
||||
private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, final RestClientBuilder builder) throws InitializationException {
|
||||
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
|
||||
final AuthorizationScheme authorizationScheme = context.getProperty(AUTHORIZATION_SCHEME).asAllowableValue(AuthorizationScheme.class);
|
||||
|
||||
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
|
@ -976,7 +972,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
|||
private String getSearchAfter(final List<Map<String, Object>> hits) throws JsonProcessingException {
|
||||
String searchAfter = null;
|
||||
if (!hits.isEmpty()) {
|
||||
final Object lastHitSort = hits.get(hits.size() - 1).get("sort");
|
||||
final Object lastHitSort = hits.getLast().get("sort");
|
||||
if (lastHitSort != null && !"null".equalsIgnoreCase(lastHitSort.toString())) {
|
||||
searchAfter = mapper.writeValueAsString(lastHitSort);
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@CapabilityDescription("Lookup a record from Elasticsearch Server associated with the specified document ID. " +
|
||||
"The coordinates that are passed to the lookup must contain the key 'id'.")
|
||||
|
@ -91,11 +92,10 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
|
|||
private final List<PropertyDescriptor> descriptors;
|
||||
|
||||
public ElasticSearchLookupService() {
|
||||
final List<PropertyDescriptor> desc = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
desc.add(CLIENT_SERVICE);
|
||||
desc.add(INDEX);
|
||||
desc.add(TYPE);
|
||||
descriptors = Collections.unmodifiableList(desc);
|
||||
descriptors = Stream.concat(
|
||||
super.getSupportedPropertyDescriptors().stream(),
|
||||
Stream.of(CLIENT_SERVICE, INDEX, TYPE)
|
||||
).toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -108,7 +108,7 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
|
|||
|
||||
final List<PropertyDescriptor> dynamicDescriptors = context.getProperties().keySet().stream()
|
||||
.filter(PropertyDescriptor::isDynamic)
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
|
||||
final Map<String, RecordPath> tempRecordPathMappings = new HashMap<>();
|
||||
for (final PropertyDescriptor desc : dynamicDescriptors) {
|
||||
|
|
|
@ -43,8 +43,8 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
|
|||
runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, elasticsearchHost);
|
||||
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
|
||||
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
|
||||
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS);
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC);
|
||||
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
|
||||
runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);
|
||||
runner.removeProperty(service, ElasticSearchClientService.API_KEY);
|
||||
|
@ -55,7 +55,7 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
|
|||
runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
|
||||
runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
|
||||
runner.removeProperty(service, ElasticSearchClientService.PATH_PREFIX);
|
||||
runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_ANY.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_ANY);
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
|
|
|
@ -125,7 +125,7 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
|
|||
final Pair<String, String> apiKey = createApiKeyForIndex();
|
||||
|
||||
runner.disableControllerService(service);
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY);
|
||||
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
|
||||
runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
|
||||
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, apiKey.getKey());
|
||||
|
@ -216,7 +216,7 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
|
|||
@Test
|
||||
void testVerifyFailedApiKeyAuth() {
|
||||
runner.disableControllerService(service);
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY);
|
||||
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
|
||||
runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
|
||||
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "invalid");
|
||||
|
@ -691,7 +691,7 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
|
|||
@Test
|
||||
void testNodeSelector() {
|
||||
runner.disableControllerService(service);
|
||||
runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_SKIP_DEDICATED_MASTERS.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_SKIP_DEDICATED_MASTERS);
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
@ -121,7 +122,7 @@ class ElasticSearchLookupService_IT extends AbstractElasticsearch_IT {
|
|||
}
|
||||
|
||||
assertNotNull(exception);
|
||||
assertTrue(exception instanceof LookupFailureException);
|
||||
assertInstanceOf(LookupFailureException.class, exception);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ class ElasticSearchClientServiceImplTest {
|
|||
final String index = "test";
|
||||
final String type = "no-type";
|
||||
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.NONE.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.NONE);
|
||||
runner.assertValid(service);
|
||||
runner.enableControllerService(service);
|
||||
|
||||
|
@ -69,7 +69,7 @@ class ElasticSearchClientServiceImplTest {
|
|||
|
||||
@Test
|
||||
void testValidateBasicAuth() {
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC);
|
||||
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
|
||||
runner.setProperty(service, ElasticSearchClientService.PASSWORD, "password");
|
||||
runner.assertValid(service);
|
||||
|
@ -87,7 +87,7 @@ class ElasticSearchClientServiceImplTest {
|
|||
|
||||
@Test
|
||||
void testValidateApiKeyAuth() {
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY);
|
||||
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "api-key-id");
|
||||
runner.setProperty(service, ElasticSearchClientService.API_KEY, "api-key");
|
||||
runner.assertValid(service);
|
||||
|
@ -105,7 +105,7 @@ class ElasticSearchClientServiceImplTest {
|
|||
|
||||
@Test
|
||||
void testValidatePkiAuth() throws InitializationException {
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.PKI.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.PKI);
|
||||
|
||||
final SSLContextService sslService = mock(SSLContextService.class);
|
||||
when(sslService.getIdentifier()).thenReturn("ssl-context");
|
||||
|
|
|
@ -38,7 +38,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
public class ElasticSearchStringLookupServiceTest {
|
||||
private ElasticSearchClientService mockClientService;
|
||||
private ElasticSearchStringLookupService lookupService;
|
||||
private TestRunner runner;
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
|
@ -46,7 +45,7 @@ public class ElasticSearchStringLookupServiceTest {
|
|||
public void setup() throws Exception {
|
||||
mockClientService = new TestElasticSearchClientService();
|
||||
lookupService = new ElasticSearchStringLookupService();
|
||||
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
|
||||
TestRunner runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
|
||||
runner.addControllerService("clientService", mockClientService);
|
||||
runner.addControllerService("lookupService", lookupService);
|
||||
runner.enableControllerService(mockClientService);
|
||||
|
|
|
@ -32,10 +32,7 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -43,38 +40,27 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
public abstract class AbstractByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("If the \"by query\" operation fails, and a flowfile was read, it will be sent to this relationship.")
|
||||
.build();
|
||||
.description("If the \"by query\" operation fails, and a flowfile was read, it will be sent to this relationship.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||
.description("If the \"by query\" operation succeeds, and a flowfile was read, it will be sent to this relationship.")
|
||||
.build();
|
||||
.description("If the \"by query\" operation succeeds, and a flowfile was read, it will be sent to this relationship.")
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> relationships;
|
||||
static final List<PropertyDescriptor> byQueryPropertyDescriptors;
|
||||
private static final Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY);
|
||||
static final List<PropertyDescriptor> byQueryPropertyDescriptors = List.of(
|
||||
QUERY_DEFINITION_STYLE,
|
||||
QUERY,
|
||||
QUERY_CLAUSE,
|
||||
SCRIPT,
|
||||
QUERY_ATTRIBUTE,
|
||||
INDEX,
|
||||
TYPE,
|
||||
CLIENT_SERVICE
|
||||
);
|
||||
|
||||
private final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
|
||||
|
||||
static {
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_SUCCESS);
|
||||
rels.add(REL_FAILURE);
|
||||
rels.add(REL_RETRY);
|
||||
relationships = Collections.unmodifiableSet(rels);
|
||||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(QUERY_DEFINITION_STYLE);
|
||||
descriptors.add(QUERY);
|
||||
descriptors.add(QUERY_CLAUSE);
|
||||
descriptors.add(SCRIPT);
|
||||
descriptors.add(QUERY_ATTRIBUTE);
|
||||
descriptors.add(INDEX);
|
||||
descriptors.add(TYPE);
|
||||
descriptors.add(CLIENT_SERVICE);
|
||||
|
||||
byQueryPropertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
abstract String getTookAttribute();
|
||||
|
||||
abstract String getErrorAttribute();
|
||||
|
@ -133,7 +119,7 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp
|
|||
try {
|
||||
final String query = getQuery(input, context, session);
|
||||
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
|
||||
final String type = context.getProperty(TYPE).isSet()
|
||||
final String type = context.getProperty(TYPE).isSet()
|
||||
? context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue()
|
||||
: null;
|
||||
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
|
||||
|
|
|
@ -41,7 +41,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -65,7 +64,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
.displayName("Search Results Split")
|
||||
.description("Output a flowfile containing all hits or one flowfile for each individual hit.")
|
||||
.allowableValues(ResultOutputStrategy.getNonPaginatedResponseOutputStrategies())
|
||||
.defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
|
||||
.defaultValue(ResultOutputStrategy.PER_RESPONSE)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.build();
|
||||
|
@ -75,7 +74,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
.displayName("Search Results Format")
|
||||
.description("Format of Hits output.")
|
||||
.allowableValues(SearchResultsFormat.class)
|
||||
.defaultValue(SearchResultsFormat.FULL.getValue())
|
||||
.defaultValue(SearchResultsFormat.FULL)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.build();
|
||||
|
@ -84,7 +83,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
.displayName("Aggregation Results Split")
|
||||
.description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.")
|
||||
.allowableValues(ResultOutputStrategy.getNonPaginatedResponseOutputStrategies())
|
||||
.defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
|
||||
.defaultValue(ResultOutputStrategy.PER_RESPONSE)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.build();
|
||||
|
@ -94,7 +93,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
.displayName("Aggregation Results Format")
|
||||
.description("Format of Aggregation output.")
|
||||
.allowableValues(AggregationResultsFormat.class)
|
||||
.defaultValue(AggregationResultsFormat.FULL.getValue())
|
||||
.defaultValue(AggregationResultsFormat.FULL)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.build();
|
||||
|
@ -110,8 +109,26 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> relationships;
|
||||
static final List<PropertyDescriptor> queryPropertyDescriptors;
|
||||
private static final Set<Relationship> relationships = Set.of(REL_ORIGINAL, REL_FAILURE, REL_HITS, REL_AGGREGATIONS);
|
||||
static final List<PropertyDescriptor> queryPropertyDescriptors = List.of(
|
||||
QUERY_DEFINITION_STYLE,
|
||||
QUERY,
|
||||
QUERY_CLAUSE,
|
||||
SIZE,
|
||||
SORT,
|
||||
AGGREGATIONS,
|
||||
FIELDS,
|
||||
SCRIPT_FIELDS,
|
||||
QUERY_ATTRIBUTE,
|
||||
INDEX,
|
||||
TYPE,
|
||||
CLIENT_SERVICE,
|
||||
SEARCH_RESULTS_SPLIT,
|
||||
SEARCH_RESULTS_FORMAT,
|
||||
AGGREGATION_RESULTS_SPLIT,
|
||||
AGGREGATION_RESULTS_FORMAT,
|
||||
OUTPUT_NO_HITS
|
||||
);
|
||||
|
||||
ResultOutputStrategy hitStrategy;
|
||||
private SearchResultsFormat hitFormat;
|
||||
|
@ -121,36 +138,6 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
|
||||
final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
|
||||
|
||||
static {
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_ORIGINAL);
|
||||
rels.add(REL_FAILURE);
|
||||
rels.add(REL_HITS);
|
||||
rels.add(REL_AGGREGATIONS);
|
||||
relationships = Collections.unmodifiableSet(rels);
|
||||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(QUERY_DEFINITION_STYLE);
|
||||
descriptors.add(QUERY);
|
||||
descriptors.add(QUERY_CLAUSE);
|
||||
descriptors.add(SIZE);
|
||||
descriptors.add(SORT);
|
||||
descriptors.add(AGGREGATIONS);
|
||||
descriptors.add(FIELDS);
|
||||
descriptors.add(SCRIPT_FIELDS);
|
||||
descriptors.add(QUERY_ATTRIBUTE);
|
||||
descriptors.add(INDEX);
|
||||
descriptors.add(TYPE);
|
||||
descriptors.add(CLIENT_SERVICE);
|
||||
descriptors.add(SEARCH_RESULTS_SPLIT);
|
||||
descriptors.add(SEARCH_RESULTS_FORMAT);
|
||||
descriptors.add(AGGREGATION_RESULTS_SPLIT);
|
||||
descriptors.add(AGGREGATION_RESULTS_FORMAT);
|
||||
descriptors.add(OUTPUT_NO_HITS);
|
||||
|
||||
queryPropertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
|
@ -185,10 +172,10 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
public void onScheduled(final ProcessContext context) {
|
||||
clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
|
||||
|
||||
hitStrategy = ResultOutputStrategy.fromValue(context.getProperty(SEARCH_RESULTS_SPLIT).getValue());
|
||||
hitFormat = SearchResultsFormat.valueOf(context.getProperty(SEARCH_RESULTS_FORMAT).getValue());
|
||||
aggregationStrategy = context.getProperty(AGGREGATION_RESULTS_SPLIT).isSet() ? ResultOutputStrategy.fromValue(context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue()) : null;
|
||||
aggregationFormat = context.getProperty(AGGREGATION_RESULTS_FORMAT).isSet() ? AggregationResultsFormat.valueOf(context.getProperty(AGGREGATION_RESULTS_FORMAT).getValue()) : null;
|
||||
hitStrategy = context.getProperty(SEARCH_RESULTS_SPLIT).asAllowableValue(ResultOutputStrategy.class);
|
||||
hitFormat = context.getProperty(SEARCH_RESULTS_FORMAT).asAllowableValue(SearchResultsFormat.class);
|
||||
aggregationStrategy = context.getProperty(AGGREGATION_RESULTS_SPLIT).isSet() ? context.getProperty(AGGREGATION_RESULTS_SPLIT).asAllowableValue(ResultOutputStrategy.class) : null;
|
||||
aggregationFormat = context.getProperty(AGGREGATION_RESULTS_FORMAT).isSet() ? context.getProperty(AGGREGATION_RESULTS_FORMAT).asAllowableValue(AggregationResultsFormat.class) : null;
|
||||
|
||||
outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean();
|
||||
}
|
||||
|
@ -267,7 +254,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
final ProcessSession session, final FlowFile aggFlowFile,
|
||||
final Map<String, String> attributes) {
|
||||
FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
|
||||
ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
|
||||
ff = session.putAllAttributes(ff, new HashMap<String, String>() {{
|
||||
if (name != null) {
|
||||
put("aggregation.name", name);
|
||||
}
|
||||
|
@ -312,11 +299,11 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
|
||||
if (aggregationFormat == AggregationResultsFormat.METADATA_ONLY) {
|
||||
formattedAggregations = new LinkedHashMap<>(aggregations);
|
||||
formattedAggregations.forEach((k, v) -> ((Map<String, Object>)v).remove("buckets"));
|
||||
formattedAggregations.forEach((k, v) -> ((Map<String, Object>) v).remove("buckets"));
|
||||
} else if (aggregationFormat == AggregationResultsFormat.BUCKETS_ONLY) {
|
||||
formattedAggregations = aggregations.entrySet().stream().collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
e -> ((Map<String, Object>)e.getValue()).get("buckets"),
|
||||
e -> ((Map<String, Object>) e.getValue()).get("buckets"),
|
||||
(k1, k2) -> k1,
|
||||
LinkedHashMap::new
|
||||
));
|
||||
|
@ -328,7 +315,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
|||
}
|
||||
|
||||
FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
|
||||
final FlowFile hitFlowFile, final Map<String, String> attributes) {
|
||||
final FlowFile hitFlowFile, final Map<String, String> attributes) {
|
||||
final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
|
||||
attributes.put("hit.count", Integer.toString(count));
|
||||
|
||||
|
|
|
@ -37,12 +37,10 @@ import org.apache.nifi.util.StringUtils;
|
|||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
|
||||
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
|
||||
|
@ -58,7 +56,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
|
|||
.description("Pagination method to use. Not all types are available for all Elasticsearch versions, " +
|
||||
"check the Elasticsearch docs to confirm which are applicable and recommended for your service.")
|
||||
.allowableValues(PaginationType.class)
|
||||
.defaultValue(PaginationType.SCROLL.getValue())
|
||||
.defaultValue(PaginationType.SCROLL)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.build();
|
||||
|
@ -75,18 +73,11 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
|
|||
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 24, TimeUnit.HOURS))
|
||||
.build();
|
||||
|
||||
static final List<PropertyDescriptor> paginatedPropertyDescriptors;
|
||||
|
||||
static {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(
|
||||
// replace SEARCH_RESULTS_SPLIT property to allow additional output strategies
|
||||
queryPropertyDescriptors.stream().map(pd -> AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.equals(pd) ? SEARCH_RESULTS_SPLIT : pd).collect(Collectors.toList())
|
||||
);
|
||||
descriptors.add(PAGINATION_TYPE);
|
||||
descriptors.add(PAGINATION_KEEP_ALIVE);
|
||||
|
||||
paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
static final List<PropertyDescriptor> paginatedPropertyDescriptors = Stream.concat(
|
||||
// replace SEARCH_RESULTS_SPLIT property to allow additional output strategies
|
||||
queryPropertyDescriptors.stream().map(pd -> AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.equals(pd) ? SEARCH_RESULTS_SPLIT : pd),
|
||||
Stream.of(PAGINATION_TYPE, PAGINATION_KEEP_ALIVE)
|
||||
).toList();
|
||||
|
||||
// output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory)
|
||||
private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n");
|
||||
|
@ -98,7 +89,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
|
|||
public void onScheduled(final ProcessContext context) {
|
||||
super.onScheduled(context);
|
||||
|
||||
paginationType = PaginationType.fromValue(context.getProperty(PAGINATION_TYPE).getValue());
|
||||
paginationType = context.getProperty(PAGINATION_TYPE).asAllowableValue(PaginationType.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -242,7 +233,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
|
|||
final FlowFile hitFlowFile;
|
||||
final boolean append = !hitsFlowFiles.isEmpty();
|
||||
if (!hitsFlowFiles.isEmpty()) {
|
||||
hitFlowFile = hitsFlowFiles.remove(0);
|
||||
hitFlowFile = hitsFlowFiles.removeFirst();
|
||||
} else {
|
||||
hitFlowFile = createChildFlowFile(session, parent);
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.nifi.record.path.validation.RecordPathValidator;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
|
@ -53,6 +52,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractPutElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
|
||||
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
|
@ -66,14 +66,14 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
|
|||
.build();
|
||||
|
||||
static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
|
||||
.name("put-es-record-index-op")
|
||||
.displayName("Index Operation")
|
||||
.description("The type of the operation used to index (create, delete, index, update, upsert)")
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.defaultValue(IndexOperationRequest.Operation.Index.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
.name("put-es-record-index-op")
|
||||
.displayName("Index Operation")
|
||||
.description("The type of the operation used to index (create, delete, index, update, upsert)")
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.defaultValue(IndexOperationRequest.Operation.Index.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor OUTPUT_ERROR_RESPONSES = new PropertyDescriptor.Builder()
|
||||
.name("put-es-output-error-responses")
|
||||
|
@ -98,13 +98,9 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
|
|||
"(and optionally \"not_found\" when \"Treat \"Not Found\" as Error\" is \"true\").")
|
||||
.build();
|
||||
|
||||
static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
|
||||
IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
|
||||
IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
|
||||
IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
|
||||
IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
|
||||
IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
|
||||
));
|
||||
static final List<String> ALLOWED_INDEX_OPERATIONS = Stream.of(IndexOperationRequest.Operation.values())
|
||||
.map(operation -> operation.getValue().toLowerCase())
|
||||
.toList();
|
||||
|
||||
private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>(getBaseRelationships());
|
||||
|
||||
|
@ -203,7 +199,7 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
|
|||
|
||||
@Override
|
||||
public List<ConfigVerificationResult> verifyAfterIndex(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes,
|
||||
final ElasticSearchClientService verifyClientService, final String index, final boolean indexExists) {
|
||||
final ElasticSearchClientService verifyClientService, final String index, final boolean indexExists) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "mime.type", description = "application/json"),
|
||||
|
@ -63,7 +63,7 @@ import java.util.stream.Collectors;
|
|||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||
@TriggerSerially
|
||||
@PrimaryNodeOnly
|
||||
@DefaultSchedule(period="1 min")
|
||||
@DefaultSchedule(period = "1 min")
|
||||
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "query", "scroll", "page", "search", "json"})
|
||||
@CapabilityDescription("A processor that repeatedly runs a paginated query against a field using a Range query to consume new Documents from an Elasticsearch index/query. " +
|
||||
"The processor will retrieve multiple pages of results until either no more results are available or the Pagination Keep Alive expiration is reached, " +
|
||||
|
@ -174,29 +174,19 @@ public class ConsumeElasticsearch extends SearchElasticsearch {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
static {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(RANGE_FIELD);
|
||||
descriptors.add(RANGE_FIELD_SORT_ORDER);
|
||||
descriptors.add(RANGE_INITIAL_VALUE);
|
||||
descriptors.add(RANGE_DATE_FORMAT);
|
||||
descriptors.add(RANGE_TIME_ZONE);
|
||||
descriptors.add(ADDITIONAL_FILTERS);
|
||||
descriptors.addAll(scrollPropertyDescriptors.stream()
|
||||
.filter(pd -> !QUERY.equals(pd) && !QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd))
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
// replace Query Builder properties with updated version without the property dependencies that are invalid for ConsumeElasticsearch
|
||||
descriptors.set(descriptors.indexOf(ElasticsearchRestProcessor.SIZE), ConsumeElasticsearch.SIZE);
|
||||
descriptors.set(descriptors.indexOf(ElasticsearchRestProcessor.AGGREGATIONS), ConsumeElasticsearch.AGGREGATIONS);
|
||||
descriptors.set(descriptors.indexOf(ElasticsearchRestProcessor.SORT), ConsumeElasticsearch.SORT);
|
||||
descriptors.set(descriptors.indexOf(ElasticsearchRestProcessor.FIELDS), ConsumeElasticsearch.FIELDS);
|
||||
descriptors.set(descriptors.indexOf(ElasticsearchRestProcessor.SCRIPT_FIELDS), ConsumeElasticsearch.SCRIPT_FIELDS);
|
||||
|
||||
propertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
private static final List<PropertyDescriptor> propertyDescriptors = Stream.concat(
|
||||
Stream.of(RANGE_FIELD, RANGE_FIELD_SORT_ORDER, RANGE_INITIAL_VALUE, RANGE_DATE_FORMAT, RANGE_TIME_ZONE, ADDITIONAL_FILTERS),
|
||||
scrollPropertyDescriptors.stream()
|
||||
.filter(pd -> !QUERY.equals(pd) && !QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd))
|
||||
.map(property -> {
|
||||
if (property == ElasticsearchRestProcessor.SIZE) return SIZE;
|
||||
if (property == ElasticsearchRestProcessor.AGGREGATIONS) return AGGREGATIONS;
|
||||
if (property == ElasticsearchRestProcessor.SORT) return SORT;
|
||||
if (property == ElasticsearchRestProcessor.FIELDS) return FIELDS;
|
||||
if (property == ElasticsearchRestProcessor.SCRIPT_FIELDS) return SCRIPT_FIELDS;
|
||||
return property;
|
||||
})
|
||||
).toList();
|
||||
|
||||
protected String trackingRangeField;
|
||||
protected String trackingSortOrder;
|
||||
|
@ -285,9 +275,11 @@ public class ConsumeElasticsearch extends SearchElasticsearch {
|
|||
if (context.getProperty(ADDITIONAL_FILTERS).isSet()) {
|
||||
final JsonNode additionalFilters = mapper.readTree(context.getProperty(ADDITIONAL_FILTERS).getValue());
|
||||
if (additionalFilters.isArray()) {
|
||||
filters.addAll(mapper.convertValue(additionalFilters, new TypeReference<List<Map<String, Object>>>() {}));
|
||||
filters.addAll(mapper.convertValue(additionalFilters, new TypeReference<List<Map<String, Object>>>() {
|
||||
}));
|
||||
} else {
|
||||
filters.add(mapper.convertValue(additionalFilters, new TypeReference<Map<String, Object>>() {}));
|
||||
filters.add(mapper.convertValue(additionalFilters, new TypeReference<Map<String, Object>>() {
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -311,7 +303,7 @@ public class ConsumeElasticsearch extends SearchElasticsearch {
|
|||
}
|
||||
|
||||
if (sort.stream().noneMatch(s -> s.containsKey(getTrackingRangeField(context)))) {
|
||||
sort.add(0, Collections.singletonMap(getTrackingRangeField(context), getTrackingSortOrder(context)));
|
||||
sort.addFirst(Collections.singletonMap(getTrackingRangeField(context), getTrackingSortOrder(context)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -335,8 +327,7 @@ public class ConsumeElasticsearch extends SearchElasticsearch {
|
|||
return;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final String nextValue = String.valueOf(((Map<String, Object>) response.getHits().get(trackingHitIndex).get("_source"))
|
||||
@SuppressWarnings("unchecked") final String nextValue = String.valueOf(((Map<String, Object>) response.getHits().get(trackingHitIndex).get("_source"))
|
||||
.get(getTrackingRangeField(null)));
|
||||
if (StringUtils.isNotBlank(nextValue)) {
|
||||
paginatedJsonQueryParameters.setTrackingRangeValue(nextValue);
|
||||
|
|
|
@ -28,11 +28,8 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService;
|
|||
import org.apache.nifi.elasticsearch.OperationResponse;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."),
|
||||
|
@ -52,15 +49,8 @@ public class DeleteByQueryElasticsearch extends AbstractByQueryElasticsearch {
|
|||
static final String TOOK_ATTRIBUTE = "elasticsearch.delete.took";
|
||||
static final String ERROR_ATTRIBUTE = "elasticsearch.delete.error";
|
||||
|
||||
private static final List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
static {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(
|
||||
byQueryPropertyDescriptors.stream().filter(pd -> !SCRIPT.equals(pd)).collect(Collectors.toList())
|
||||
);
|
||||
|
||||
propertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
private static final List<PropertyDescriptor> propertyDescriptors =
|
||||
byQueryPropertyDescriptors.stream().filter(pd -> !SCRIPT.equals(pd)).toList();
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
|
|
@ -79,7 +79,7 @@ public interface ElasticsearchRestProcessor extends Processor, VerifiableProcess
|
|||
.description("How the JSON Query will be defined for use by the processor.")
|
||||
.required(true)
|
||||
.allowableValues(QueryDefinitionType.class)
|
||||
.defaultValue(QueryDefinitionType.FULL_QUERY.getValue())
|
||||
.defaultValue(QueryDefinitionType.FULL_QUERY)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
|
||||
|
@ -217,7 +217,7 @@ public interface ElasticsearchRestProcessor extends Processor, VerifiableProcess
|
|||
default String getQuery(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
|
||||
String retVal = getQuery(input != null ? input.getAttributes() : Collections.emptyMap(), context);
|
||||
if (DEFAULT_QUERY_JSON.equals(retVal) && input != null
|
||||
&& QueryDefinitionType.FULL_QUERY.getValue().equals(context.getProperty(QUERY_DEFINITION_STYLE).getValue())
|
||||
&& QueryDefinitionType.FULL_QUERY == context.getProperty(QUERY_DEFINITION_STYLE).asAllowableValue(QueryDefinitionType.class)
|
||||
&& !context.getProperty(QUERY).isSet()) {
|
||||
try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
|
||||
session.exportTo(input, out);
|
||||
|
@ -230,7 +230,7 @@ public interface ElasticsearchRestProcessor extends Processor, VerifiableProcess
|
|||
|
||||
default String getQuery(final Map<String, String> attributes, final ProcessContext context) throws IOException {
|
||||
final String retVal;
|
||||
if (QueryDefinitionType.FULL_QUERY.getValue().equals(context.getProperty(QUERY_DEFINITION_STYLE).getValue())) {
|
||||
if (QueryDefinitionType.FULL_QUERY == context.getProperty(QUERY_DEFINITION_STYLE).asAllowableValue(QueryDefinitionType.class)) {
|
||||
if (context.getProperty(QUERY).isSet()) {
|
||||
retVal = context.getProperty(QUERY).evaluateAttributeExpressions(attributes).getValue();
|
||||
} else {
|
||||
|
|
|
@ -44,10 +44,7 @@ import org.apache.nifi.util.StopWatch;
|
|||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -123,12 +120,9 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
|
|||
|
||||
public static final String VERIFICATION_STEP_DOCUMENT_EXISTS = "Elasticsearch Document Exists";
|
||||
|
||||
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
|
||||
ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
|
||||
));
|
||||
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
|
||||
)));
|
||||
static final List<PropertyDescriptor> DESCRIPTORS =
|
||||
List.of(ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE);
|
||||
static final Set<Relationship> RELATIONSHIPS = Set.of(REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND);
|
||||
|
||||
private final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
|
||||
|
||||
|
|
|
@ -48,9 +48,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -173,13 +171,12 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
|
|||
.dependsOn(OUTPUT_ERROR_DOCUMENTS, "true")
|
||||
.build();
|
||||
|
||||
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
|
||||
static final List<PropertyDescriptor> DESCRIPTORS = List.of(
|
||||
ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, SCRIPT, SCRIPTED_UPSERT, DYNAMIC_TEMPLATES, BATCH_SIZE, CHARSET, CLIENT_SERVICE,
|
||||
LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL
|
||||
));
|
||||
static final Set<Relationship> BASE_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
|
||||
)));
|
||||
);
|
||||
static final Set<Relationship> BASE_RELATIONSHIPS =
|
||||
Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS);
|
||||
|
||||
private boolean outputErrors;
|
||||
|
||||
|
@ -293,7 +290,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
|
|||
}
|
||||
|
||||
private List<FlowFile> indexDocuments(final List<IndexOperationRequest> operations, final List<FlowFile> originals, final ProcessContext context, final ProcessSession session) throws IOException {
|
||||
final Map<String, String> dynamicProperties = getDynamicProperties(context, originals.get(0));
|
||||
final Map<String, String> dynamicProperties = getDynamicProperties(context, originals.getFirst());
|
||||
final IndexOperationResponse response = clientService.get().bulk(operations, getRequestURLParameters(dynamicProperties));
|
||||
|
||||
final Map<Integer, Map<String, Object>> errors = findElasticsearchResponseErrors(response);
|
||||
|
|
|
@ -50,12 +50,12 @@ import org.apache.nifi.record.path.RecordPathResult;
|
|||
import org.apache.nifi.record.path.util.RecordPathCache;
|
||||
import org.apache.nifi.record.path.validation.RecordPathValidator;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.DateTimeFormatValidator;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.DateTimeFormatValidator;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.PushBackRecordSet;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
@ -72,7 +72,6 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -83,7 +82,6 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "put", "index", "record"})
|
||||
|
@ -337,16 +335,15 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
|
||||
static final List<PropertyDescriptor> DESCRIPTORS = List.of(
|
||||
INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
|
||||
INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
|
||||
SCRIPT_RECORD_PATH, SCRIPTED_UPSERT_RECORD_PATH, DYNAMIC_TEMPLATES_RECORD_PATH, DATE_FORMAT, TIME_FORMAT,
|
||||
TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL,
|
||||
GROUP_BULK_ERRORS_BY_TYPE
|
||||
));
|
||||
static final Set<Relationship> BASE_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS
|
||||
)));
|
||||
);
|
||||
static final Set<Relationship> BASE_RELATIONSHIPS =
|
||||
Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS);
|
||||
|
||||
private static final String OUTPUT_TYPE_SUCCESS = "success";
|
||||
private static final String OUTPUT_TYPE_ERROR = "error";
|
||||
|
@ -517,7 +514,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
|
|||
|
||||
successfulRecords.getAndAdd(responseDetails.getSuccessCount());
|
||||
erroredRecords.getAndAdd(responseDetails.getErrorCount());
|
||||
resultRecords.addAll(responseDetails.getOutputs().values().stream().map(Output::getFlowFile).collect(Collectors.toList()));
|
||||
resultRecords.addAll(responseDetails.getOutputs().values().stream().map(Output::getFlowFile).toList());
|
||||
|
||||
operationList.clear();
|
||||
originals.clear();
|
||||
|
@ -710,37 +707,22 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
|
|||
return null;
|
||||
}
|
||||
|
||||
final Object returnValue;
|
||||
switch (chosenDataType.getFieldType()) {
|
||||
case DATE:
|
||||
case TIME:
|
||||
case TIMESTAMP:
|
||||
final Object returnValue = switch (chosenDataType.getFieldType()) {
|
||||
case DATE, TIME, TIMESTAMP -> {
|
||||
final String format = determineDateFormat(chosenDataType.getFieldType());
|
||||
returnValue = coerceStringToLong(
|
||||
yield coerceStringToLong(
|
||||
fieldName,
|
||||
StandardFieldConverterRegistry.getRegistry().getFieldConverter(String.class).convertField(coercedValue, Optional.ofNullable(format), path.getPath())
|
||||
);
|
||||
break;
|
||||
case LONG:
|
||||
returnValue = DataTypeUtils.toLong(coercedValue, fieldName);
|
||||
break;
|
||||
case INT:
|
||||
case BYTE:
|
||||
case SHORT:
|
||||
returnValue = DataTypeUtils.toInteger(coercedValue, fieldName);
|
||||
break;
|
||||
case CHAR:
|
||||
case STRING:
|
||||
returnValue = coerceStringToLong(fieldName, coercedValue.toString());
|
||||
break;
|
||||
case BIGINT:
|
||||
returnValue = coercedValue;
|
||||
break;
|
||||
default:
|
||||
throw new ProcessException(
|
||||
String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType, path.getPath())
|
||||
);
|
||||
}
|
||||
}
|
||||
case LONG -> DataTypeUtils.toLong(coercedValue, fieldName);
|
||||
case INT, BYTE, SHORT -> DataTypeUtils.toInteger(coercedValue, fieldName);
|
||||
case CHAR, STRING -> coerceStringToLong(fieldName, coercedValue.toString());
|
||||
case BIGINT -> coercedValue;
|
||||
default -> throw new ProcessException(
|
||||
String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType, path.getPath())
|
||||
);
|
||||
};
|
||||
|
||||
if (!retain) {
|
||||
fieldValue.updateValue(null);
|
||||
|
@ -777,21 +759,12 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
|
|||
}
|
||||
|
||||
private String determineDateFormat(final RecordFieldType recordFieldType) {
|
||||
final String format;
|
||||
switch (recordFieldType) {
|
||||
case DATE:
|
||||
format = this.dateFormat;
|
||||
break;
|
||||
case TIME:
|
||||
format = this.timeFormat;
|
||||
break;
|
||||
case TIMESTAMP:
|
||||
format = this.timestampFormat;
|
||||
break;
|
||||
default:
|
||||
format = null;
|
||||
}
|
||||
return format;
|
||||
return switch (recordFieldType) {
|
||||
case DATE -> this.dateFormat;
|
||||
case TIME -> this.timeFormat;
|
||||
case TIMESTAMP -> this.timestampFormat;
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
|
||||
private Object coerceStringToLong(final String fieldName, final String stringValue) {
|
||||
|
|
|
@ -44,27 +44,24 @@ import org.apache.nifi.util.StringUtils;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "mime.type", description = "application/json"),
|
||||
@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "page.number", description = "The number of the page (request), starting from 1, in which the results were returned that are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
|
||||
@WritesAttribute(attribute = "mime.type", description = "application/json"),
|
||||
@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "page.number", description = "The number of the page (request), starting from 1, in which the results were returned that are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
|
||||
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
|
||||
})
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||
@TriggerSerially
|
||||
@PrimaryNodeOnly
|
||||
@DefaultSchedule(period="1 min")
|
||||
@DefaultSchedule(period = "1 min")
|
||||
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "query", "scroll", "page", "search", "json"})
|
||||
@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
|
||||
"Search After/Point in Time queries must include a valid \"sort\" field. The processor will retrieve multiple pages of results " +
|
||||
|
@ -97,26 +94,19 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
|
|||
"If the query is empty, a default JSON Object will be used, which will result in a \"match_all\" query in Elasticsearch.")
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> relationships;
|
||||
private static final Set<Relationship> relationships = Set.of(REL_HITS, REL_AGGREGATIONS);
|
||||
|
||||
static final List<PropertyDescriptor> scrollPropertyDescriptors;
|
||||
|
||||
static {
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
rels.add(REL_HITS);
|
||||
rels.add(REL_AGGREGATIONS);
|
||||
relationships = Collections.unmodifiableSet(rels);
|
||||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
// ensure QUERY_DEFINITION_STYLE first for consistency between Elasticsearch processors
|
||||
descriptors.add(QUERY_DEFINITION_STYLE);
|
||||
descriptors.add(QUERY);
|
||||
descriptors.addAll(paginatedPropertyDescriptors.stream().filter(
|
||||
// override QUERY to change description (no FlowFile content used by SearchElasticsearch)
|
||||
pd -> !ElasticsearchRestProcessor.QUERY.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd)
|
||||
).collect(Collectors.toList()));
|
||||
scrollPropertyDescriptors = descriptors;
|
||||
}
|
||||
static final List<PropertyDescriptor> scrollPropertyDescriptors = Stream.concat(
|
||||
Stream.of(
|
||||
// ensure QUERY_DEFINITION_STYLE first for consistency between Elasticsearch processors
|
||||
QUERY_DEFINITION_STYLE,
|
||||
QUERY
|
||||
),
|
||||
paginatedPropertyDescriptors.stream().filter(
|
||||
// override QUERY to change description (no FlowFile content used by SearchElasticsearch)
|
||||
pd -> !ElasticsearchRestProcessor.QUERY.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd)
|
||||
)
|
||||
).toList();
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
|
|
|
@ -24,9 +24,9 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
|||
import java.util.List;
|
||||
|
||||
public class BulkOperation {
|
||||
private List<IndexOperationRequest> operationList;
|
||||
private List<Record> originalRecords;
|
||||
private RecordSchema schema;
|
||||
private final List<IndexOperationRequest> operationList;
|
||||
private final List<Record> originalRecords;
|
||||
private final RecordSchema schema;
|
||||
|
||||
public BulkOperation(List<IndexOperationRequest> operationList, List<Record> originalRecords, RecordSchema schema) {
|
||||
this.operationList = operationList;
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.nifi.processors.elasticsearch.api;
|
|||
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public enum PaginationType implements DescribedValue {
|
||||
SCROLL("pagination-scroll", "Use Elasticsearch \"_scroll\" API to page results. Does not accept additional query parameters."),
|
||||
SEARCH_AFTER("pagination-search_after", "Use Elasticsearch \"search_after\" _search API to page sorted results."),
|
||||
|
@ -48,9 +46,4 @@ public enum PaginationType implements DescribedValue {
|
|||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public static PaginationType fromValue(final String value) {
|
||||
return Arrays.stream(PaginationType.values()).filter(v -> v.getValue().equals(value)).findFirst()
|
||||
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.nifi.processors.elasticsearch.api;
|
|||
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public enum QueryDefinitionType implements DescribedValue {
|
||||
FULL_QUERY("full", "Provide the full Query."),
|
||||
BUILD_QUERY("build", "Build the Query from separate JSON objects.");
|
||||
|
@ -47,9 +45,4 @@ public enum QueryDefinitionType implements DescribedValue {
|
|||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public static QueryDefinitionType fromValue(final String value) {
|
||||
return Arrays.stream(QueryDefinitionType.values()).filter(v -> v.getValue().equals(value)).findFirst()
|
||||
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch.api;
|
|||
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
|
||||
public enum ResultOutputStrategy implements DescribedValue {
|
||||
|
@ -56,9 +55,4 @@ public enum ResultOutputStrategy implements DescribedValue {
|
|||
public static EnumSet<ResultOutputStrategy> getNonPaginatedResponseOutputStrategies() {
|
||||
return EnumSet.of(PER_RESPONSE, PER_HIT);
|
||||
}
|
||||
|
||||
public static ResultOutputStrategy fromValue(final String value) {
|
||||
return Arrays.stream(ResultOutputStrategy.values()).filter(v -> v.getValue().equals(value)).findFirst()
|
||||
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.elasticsearch.put;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
|
||||
public class FlowFileJsonDescription {
|
||||
private String index;
|
||||
private String type;
|
||||
private String id;
|
||||
private String content;
|
||||
|
||||
private JsonPath idJsonPath;
|
||||
private JsonPath typeJsonPath;
|
||||
private JsonPath indexJsonPath;
|
||||
|
||||
public FlowFileJsonDescription(String index, String type, String id, String content, JsonPath idJsonPath, JsonPath typeJsonPath, JsonPath indexJsonPath) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.content = content;
|
||||
this.idJsonPath = idJsonPath;
|
||||
this.typeJsonPath = typeJsonPath;
|
||||
this.indexJsonPath = indexJsonPath;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public JsonPath getIdJsonPath() {
|
||||
return idJsonPath;
|
||||
}
|
||||
|
||||
public JsonPath getTypeJsonPath() {
|
||||
return typeJsonPath;
|
||||
}
|
||||
|
||||
public JsonPath getIndexJsonPath() {
|
||||
return indexJsonPath;
|
||||
}
|
||||
}
|
|
@ -1,24 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.elasticsearch.put;
|
||||
|
||||
public class JsonProcessingError extends Exception {
|
||||
public JsonProcessingError(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -81,8 +81,8 @@ public abstract class AbstractByQueryElasticsearchTest {
|
|||
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 1);
|
||||
|
||||
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_SUCCESS);
|
||||
final String attr = flowFiles.get(0).getAttribute(tookAttr());
|
||||
final String query = flowFiles.get(0).getAttribute(queryAttr());
|
||||
final String attr = flowFiles.getFirst().getAttribute(tookAttr());
|
||||
final String query = flowFiles.getFirst().getAttribute(queryAttr());
|
||||
assertNotNull(attr);
|
||||
assertEquals("100", attr);
|
||||
assertNotNull(query);
|
||||
|
@ -130,7 +130,7 @@ public abstract class AbstractByQueryElasticsearchTest {
|
|||
|
||||
@Test
|
||||
void testInvalidQueryProperty() {
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-index");
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY, "not-json");
|
||||
|
||||
|
@ -140,7 +140,7 @@ public abstract class AbstractByQueryElasticsearchTest {
|
|||
|
||||
@Test
|
||||
void testInvalidQueryBuilderProperties() {
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-index");
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_CLAUSE, "not-json");
|
||||
runner.setProperty(ElasticsearchRestProcessor.SCRIPT, "not-json-script");
|
||||
|
@ -221,7 +221,7 @@ public abstract class AbstractByQueryElasticsearchTest {
|
|||
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 1);
|
||||
|
||||
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_FAILURE).get(0);
|
||||
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_FAILURE).getFirst();
|
||||
final String attr = mockFlowFile.getAttribute(errorAttr());
|
||||
assertNotNull(attr);
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ public abstract class AbstractByQueryElasticsearchTest {
|
|||
@ParameterizedTest
|
||||
@MethodSource
|
||||
void testQueryBuilder(final String queryClause, final String script, final String expectedQuery) throws Exception {
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
|
||||
if (queryClause != null) {
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_CLAUSE, queryClause);
|
||||
|
|
|
@ -176,7 +176,7 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
@Test
|
||||
void testInvalidQueryBuilderProperties() {
|
||||
final TestRunner runner = createRunner(false);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_CLAUSE, "not-json");
|
||||
runner.setProperty(ElasticsearchRestProcessor.SIZE, "-1");
|
||||
runner.setProperty(ElasticsearchRestProcessor.AGGREGATIONS, "not-json-aggs");
|
||||
|
@ -193,10 +193,10 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
// test hits (no splitting) - full hit format
|
||||
final TestRunner runner = createRunner(false);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, matchAllQuery);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL);
|
||||
runOnce(runner);
|
||||
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0);
|
||||
final MockFlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0);
|
||||
final MockFlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst();
|
||||
hits.assertAttributeEquals("hit.count", "10");
|
||||
assertOutputContent(hits.getContent(), 10, false);
|
||||
final List<Map<String, Object>> result = JsonUtils.readListOfMaps(hits.getContent());
|
||||
|
@ -211,8 +211,8 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
reset(runner);
|
||||
|
||||
// test splitting hits - _source only format
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY);
|
||||
runOnce(runner);
|
||||
testCounts(runner, isInput() ? 1 : 0, 10, 0, 0);
|
||||
|
||||
|
@ -235,8 +235,8 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
reset(runner);
|
||||
|
||||
// test splitting hits - metadata only format
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.METADATA_ONLY.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.METADATA_ONLY);
|
||||
runOnce(runner);
|
||||
testCounts(runner, isInput() ? 1 : 0, 10, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(hit -> {
|
||||
|
@ -290,11 +290,11 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
void testAggregationsFullFormat() {
|
||||
final TestRunner runner = createRunner(true);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, matchAllAggregationWithDefaultTermsQuery);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL);
|
||||
runOnce(runner);
|
||||
testCounts(runner, isInput() ? 1 : 0, 1, 0, 1);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
final MockFlowFile aggregations = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
final MockFlowFile aggregations = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).getFirst();
|
||||
aggregations.assertAttributeNotExists("aggregation.number");
|
||||
aggregations.assertAttributeNotExists("aggregation.name");
|
||||
// count == 1 because aggregations is a single Map rather than a List of Maps, even when there are multiple aggs
|
||||
|
@ -315,11 +315,11 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
final TestRunner runner = createRunner(true);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, matchAllAggregationWithDefaultTermsQuery);
|
||||
runner.setIncomingConnection(false);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.BUCKETS_ONLY.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.BUCKETS_ONLY);
|
||||
runner.run(1, true, true);
|
||||
testCounts(runner, 0, 1, 0, 1);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
final MockFlowFile singleAgg = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
final MockFlowFile singleAgg = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).getFirst();
|
||||
singleAgg.assertAttributeNotExists("aggregation.number");
|
||||
singleAgg.assertAttributeNotExists("aggregation.name");
|
||||
final Map<String, Object> agg2 = JsonUtils.readMap(singleAgg.getContent());
|
||||
|
@ -339,11 +339,11 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
void testSplittingAggregationsMetadataOnlyFormat() {
|
||||
final TestRunner runner = createRunner(true);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, matchAllAggregationWithDefaultTermsQuery);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.METADATA_ONLY.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.METADATA_ONLY);
|
||||
runOnce(runner);
|
||||
testCounts(runner, isInput() ? 1 : 0, 1, 0, 2);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
int a = 0;
|
||||
for (final MockFlowFile termAgg : runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS)) {
|
||||
termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2");
|
||||
|
@ -366,11 +366,11 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.INDEX, "${es.index}");
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.TYPE, "${es.type}");
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL);
|
||||
runOnce(runner);
|
||||
testCounts(runner, isInput() ? 1 : 0, 1, 0, 2);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
int a = 0;
|
||||
for (final MockFlowFile termAgg : runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS)) {
|
||||
termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2");
|
||||
|
@ -399,7 +399,7 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
|
||||
final TestRunner runner = createRunner(true);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, queryDefinitionType.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, queryDefinitionType);
|
||||
setQuery(runner, query);
|
||||
|
||||
runOnce(runner);
|
||||
|
@ -475,7 +475,7 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
((ConsumeElasticsearch) runner.getProcessor()).trackingRangeField = RANGE_FIELD_NAME;
|
||||
((ConsumeElasticsearch) runner.getProcessor()).trackingSortOrder = RANGE_SORT_ORDER;
|
||||
} else {
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
}
|
||||
|
||||
if (queryClause != null) {
|
||||
|
@ -555,7 +555,7 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
final String expected;
|
||||
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
|
||||
// test Range Field defined but no initial value
|
||||
runner.setProperty(ConsumeElasticsearch.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ConsumeElasticsearch.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
runner.setProperty(ConsumeElasticsearch.RANGE_FIELD, RANGE_FIELD_NAME);
|
||||
|
||||
// should be no "query" (with no initial value) but "sort" added
|
||||
|
@ -570,7 +570,7 @@ public abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQ
|
|||
|
||||
void setQuery(final TestRunner runner, final String query) throws JsonProcessingException {
|
||||
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
}
|
||||
|
||||
if (QueryDefinitionType.BUILD_QUERY.getValue().equals(runner.getProcessContext().getProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE).getValue())) {
|
||||
|
|
|
@ -78,7 +78,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, matchAllQuery);
|
||||
MockFlowFile input = runOnce(runner);
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, isInput() ? 1 : 0, 1, 0, 0);
|
||||
final MockFlowFile pageQueryHitsNoSplitting = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0);
|
||||
final MockFlowFile pageQueryHitsNoSplitting = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst();
|
||||
pageQueryHitsNoSplitting.assertAttributeEquals("hit.count", "10");
|
||||
pageQueryHitsNoSplitting.assertAttributeEquals("page.number", "1");
|
||||
AbstractJsonQueryElasticsearchTest.assertOutputContent(pageQueryHitsNoSplitting.getContent(), 10, false);
|
||||
|
@ -89,7 +89,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
reset(runner);
|
||||
|
||||
// paged query hits splitting
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT);
|
||||
input = runOnce(runner);
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, isInput() ? 1 : 0, 10, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(hit -> {
|
||||
|
@ -103,10 +103,10 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
reset(runner);
|
||||
|
||||
// paged query hits combined
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY);
|
||||
input = runOnce(runner);
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, isInput() ? 1 : 0, 1, 0, 0);
|
||||
final MockFlowFile pageQueryHitsCombined = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0);
|
||||
final MockFlowFile pageQueryHitsCombined = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst();
|
||||
pageQueryHitsCombined.assertAttributeEquals("hit.count", "10");
|
||||
pageQueryHitsCombined.assertAttributeEquals("page.number", "1");
|
||||
AbstractJsonQueryElasticsearchTest.assertOutputContent(pageQueryHitsCombined.getContent(), 10, true);
|
||||
|
@ -183,15 +183,15 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
void testResultsFormat(final ResultOutputStrategy resultOutputStrategy) throws JsonProcessingException {
|
||||
final TestRunner runner = createRunner(false);
|
||||
setQuery(runner, matchAllWithSortByMessage);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy);
|
||||
|
||||
// Test against each result format
|
||||
for (final SearchResultsFormat searchResultsFormat : SearchResultsFormat.values()) {
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, searchResultsFormat.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, searchResultsFormat);
|
||||
|
||||
// Test against each pagination type
|
||||
for (final PaginationType paginationType : PaginationType.values()) {
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType);
|
||||
|
||||
runOnce(runner);
|
||||
assertResultsFormat(runner, resultOutputStrategy, searchResultsFormat);
|
||||
|
@ -205,7 +205,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
final TestRunner runner = createRunner(false);
|
||||
final TestElasticsearchClientService service = AbstractJsonQueryElasticsearchTest.getService(runner);
|
||||
service.setThrowErrorInDelete(true);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, matchAllWithSortByMsgWithoutSize);
|
||||
|
||||
// still expect "success" output for exception during final clean-up
|
||||
|
@ -221,7 +221,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
final TestRunner runner = createRunner(false);
|
||||
final TestElasticsearchClientService service = AbstractJsonQueryElasticsearchTest.getService(runner);
|
||||
service.setMaxPages(2);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, matchAllWithSortByMsgWithSizeQuery);
|
||||
|
||||
if (getStateScope() != null) {
|
||||
|
@ -246,9 +246,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
final TestRunner runner = createRunner(false);
|
||||
final TestElasticsearchClientService service = AbstractJsonQueryElasticsearchTest.getService(runner);
|
||||
service.setThrowErrorInDelete(true);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue());
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL);
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME);
|
||||
setQuery(runner, matchAllWithSortByMsgWithoutSize);
|
||||
|
||||
// still expect "success" output for exception during final clean-up
|
||||
|
@ -264,7 +264,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
final TestRunner runner = createRunner(false);
|
||||
final TestElasticsearchClientService service = AbstractJsonQueryElasticsearchTest.getService(runner);
|
||||
service.setThrowErrorInPit(true);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME);
|
||||
setQuery(runner, matchAllWithSortByMsgWithoutSize);
|
||||
|
||||
// expect "failure" output for exception during query setup
|
||||
|
@ -279,7 +279,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
@EnumSource(PaginationType.class)
|
||||
void testPaginatedQueryWithoutSort(final PaginationType paginationType) throws JsonProcessingException {
|
||||
final TestRunner runner = createRunner(false);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType);
|
||||
setQuery(runner, matchAllQuery);
|
||||
|
||||
if (PaginationType.SCROLL == paginationType) {
|
||||
|
@ -309,12 +309,12 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
final TestRunner runner = createRunner(false);
|
||||
final TestElasticsearchClientService service = AbstractJsonQueryElasticsearchTest.getService(runner);
|
||||
service.setMaxPages(2);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType);
|
||||
setQuery(runner, matchAllWithSortByMsgWithSizeQuery);
|
||||
|
||||
// Tests flowfile per page, hits splitting and hits combined
|
||||
for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy);
|
||||
for (int iteration = 1; iteration < 4; iteration++) {
|
||||
// Check that changing OUTPUT_NO_HITS doesn't have any adverse effects on pagination
|
||||
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, String.valueOf(iteration % 2 > 0).toLowerCase());
|
||||
|
@ -362,19 +362,19 @@ public abstract class AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
|
|||
final TestRunner runner = createRunner(false);
|
||||
final TestElasticsearchClientService service = AbstractJsonQueryElasticsearchTest.getService(runner);
|
||||
service.setMaxPages(0);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType);
|
||||
setQuery(runner, matchAllWithSortByMessage);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true");
|
||||
|
||||
for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
|
||||
// test that an empty flow file is produced for a per query setup
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy);
|
||||
runOnce(runner);
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, isInput() ? 1 : 0, 1, 0, 0);
|
||||
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1");
|
||||
assertEquals(0, runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize());
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "0");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number", "1");
|
||||
assertEquals(0, runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().getSize());
|
||||
reset(runner);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -150,14 +150,14 @@ public class ConsumeElasticsearchTest extends SearchElasticsearchTest {
|
|||
if ("asc".equals(sortOrder)) {
|
||||
if (hasHits) {
|
||||
final List<Map<String, Object>> expectedHits = pageCount == 0 ? FIRST_PAGE_OF_HITS : SECOND_PAGE_OF_HITS;
|
||||
assertEquals(getHitValue(expectedHits.get(expectedHits.size() - 1)), paginatedJsonQueryParameters.getTrackingRangeValue());
|
||||
assertEquals(getHitValue(expectedHits.getLast()), paginatedJsonQueryParameters.getTrackingRangeValue());
|
||||
} else {
|
||||
assertEquals(defaultUnset, paginatedJsonQueryParameters.getTrackingRangeValue());
|
||||
}
|
||||
} else {
|
||||
if (pageCount == 0) {
|
||||
if (hasHits) {
|
||||
assertEquals(getHitValue(FIRST_PAGE_OF_HITS.get(0)), paginatedJsonQueryParameters.getTrackingRangeValue());
|
||||
assertEquals(getHitValue(FIRST_PAGE_OF_HITS.getFirst()), paginatedJsonQueryParameters.getTrackingRangeValue());
|
||||
} else {
|
||||
assertEquals(defaultUnset, paginatedJsonQueryParameters.getTrackingRangeValue());
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ public class GetElasticsearchTest {
|
|||
public void testFetch() {
|
||||
runProcessor(runner);
|
||||
testCounts(runner, 1, 0, 0);
|
||||
final MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0);
|
||||
final MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).getFirst();
|
||||
assertOutputContent(doc.getContent());
|
||||
assertEquals(1L, runner.getProvenanceEvents().stream()
|
||||
.filter(event -> ProvenanceEventType.RECEIVE.equals(event.getEventType()) && event.getAttribute("uuid").equals(doc.getAttribute("uuid"))).count());
|
||||
|
@ -123,7 +123,7 @@ public class GetElasticsearchTest {
|
|||
runner.setIncomingConnection(true);
|
||||
runProcessor(runner);
|
||||
testCounts(runner, 1, 0, 0);
|
||||
MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0);
|
||||
MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).getFirst();
|
||||
assertOutputContent(doc.getContent());
|
||||
assertCommonAttributes(doc);
|
||||
assertOutputAttribute(doc, false);
|
||||
|
@ -132,7 +132,7 @@ public class GetElasticsearchTest {
|
|||
runner.setIncomingConnection(false);
|
||||
runner.run();
|
||||
testCounts(runner, 1, 0, 0);
|
||||
doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0);
|
||||
doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).getFirst();
|
||||
assertOutputContent(doc.getContent());
|
||||
assertCommonAttributes(doc);
|
||||
assertOutputAttribute(doc, false);
|
||||
|
@ -144,7 +144,7 @@ public class GetElasticsearchTest {
|
|||
runner.setIncomingConnection(true);
|
||||
runProcessor(runner);
|
||||
testCounts(runner, 1, 0, 0);
|
||||
MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0);
|
||||
MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).getFirst();
|
||||
assertEquals("test", doc.getContent());
|
||||
assertCommonAttributes(doc);
|
||||
assertOutputAttribute(doc, true);
|
||||
|
@ -156,7 +156,7 @@ public class GetElasticsearchTest {
|
|||
runner.setIncomingConnection(false);
|
||||
runner.run();
|
||||
testCounts(runner, 1, 0, 0);
|
||||
doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0);
|
||||
doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).getFirst();
|
||||
assertEquals("", doc.getContent());
|
||||
assertCommonAttributes(doc, false);
|
||||
assertOutputAttribute(doc, true, "my_attr");
|
||||
|
@ -188,7 +188,7 @@ public class GetElasticsearchTest {
|
|||
runner.setProperty(GetElasticsearch.ID, "${noAttribute}");
|
||||
runProcessor(runner);
|
||||
testCounts(runner, 0, 1, 0);
|
||||
final MockFlowFile failed = runner.getFlowFilesForRelationship(GetElasticsearch.REL_FAILURE).get(0);
|
||||
final MockFlowFile failed = runner.getFlowFilesForRelationship(GetElasticsearch.REL_FAILURE).getFirst();
|
||||
failed.assertAttributeEquals("elasticsearch.get.error", GetElasticsearch.ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document");
|
||||
reset(runner);
|
||||
}
|
||||
|
|
|
@ -59,9 +59,9 @@ public class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQu
|
|||
case PER_QUERY:
|
||||
final int expectedHits = 20;
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 1, 1, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", Integer.toString(expectedHits));
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2");
|
||||
assertEquals(expectedHits, runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", Integer.toString(expectedHits));
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number", "2");
|
||||
assertEquals(expectedHits, runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().getContent().split("\n").length);
|
||||
break;
|
||||
case PER_HIT:
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 1, 20, 0, 0);
|
||||
|
|
|
@ -186,7 +186,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
|
|||
assertEquals(1L, opCount);
|
||||
assertEquals(1L, headerFieldsCount);
|
||||
|
||||
final Map<String, String> headerFields = items.get(0).getHeaderFields();
|
||||
final Map<String, String> headerFields = items.getFirst().getHeaderFields();
|
||||
assertEquals(2, headerFields.size());
|
||||
assertEquals("1", headerFields.get("routing"));
|
||||
assertEquals("external", headerFields.get("version"));
|
||||
|
@ -217,7 +217,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
|
|||
assertEquals(1L, nullIdCount);
|
||||
assertEquals(1L, headerFieldsCount);
|
||||
|
||||
final Map<String, String> headerFields = items.get(0).getHeaderFields();
|
||||
final Map<String, String> headerFields = items.getFirst().getHeaderFields();
|
||||
assertEquals(2, headerFields.size());
|
||||
assertEquals("1", headerFields.get("routing"));
|
||||
assertEquals("external", headerFields.get("version"));
|
||||
|
@ -335,7 +335,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
|
|||
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 3);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
|
||||
|
||||
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).get(0);
|
||||
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).getFirst();
|
||||
assertTrue(failedDoc.getContent().contains("20abcd"));
|
||||
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
|
||||
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
|
||||
|
@ -376,7 +376,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
|
|||
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 4);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1);
|
||||
|
||||
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).get(0);
|
||||
MockFlowFile failedDoc = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS).getFirst();
|
||||
assertTrue(failedDoc.getContent().contains("not_found"));
|
||||
failedDoc.assertAttributeExists("elasticsearch.bulk.error");
|
||||
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
|
||||
|
@ -400,7 +400,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
|
|||
failedDoc.assertAttributeNotExists("elasticsearch.put.error");
|
||||
assertTrue(failedDoc.getAttribute("elasticsearch.bulk.error").contains("some_other_exception"));
|
||||
|
||||
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES).get(0).getContent();
|
||||
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES).getFirst().getContent();
|
||||
assertTrue(errorResponses.contains("not_found"));
|
||||
assertTrue(errorResponses.contains("For input string: 20abc"));
|
||||
assertTrue(errorResponses.contains("For input string: 213,456.9"));
|
||||
|
@ -441,7 +441,7 @@ public class PutElasticsearchJsonTest extends AbstractPutElasticsearchTest<PutEl
|
|||
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
|
||||
|
||||
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE).get(0);
|
||||
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE).getFirst();
|
||||
assertTrue(flowFile.getAttribute("elasticsearch.put.error").contains("not"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -483,7 +483,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
|
|||
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
|
||||
final MockFlowFile failure = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILURE).get(0);
|
||||
final MockFlowFile failure = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILURE).getFirst();
|
||||
failure.assertAttributeEquals("elasticsearch.put.error", String.format("Field referenced by %s must be Map-type compatible or a String parsable into a JSON Object", "/dynamic_templates"));
|
||||
}
|
||||
|
||||
|
@ -666,8 +666,8 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
|
|||
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(0).assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(errorCount));
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).get(0).assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(errorCount));
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
|
||||
String.valueOf(successCount));
|
||||
}
|
||||
|
||||
|
@ -691,7 +691,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
|
|||
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1");
|
||||
assertTrue(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(1)
|
||||
.getAttribute("elasticsearch.bulk.error").contains("some_other_exception"));
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).get(0)
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst()
|
||||
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(successCount));
|
||||
}
|
||||
|
||||
|
@ -707,9 +707,9 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
|
|||
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 0);
|
||||
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(0)
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).getFirst()
|
||||
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(errorCount));
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).get(0)
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst()
|
||||
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(successCount));
|
||||
}
|
||||
|
||||
|
@ -737,7 +737,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
|
|||
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1");
|
||||
assertTrue(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS).get(2)
|
||||
.getAttribute("elasticsearch.bulk.error").contains("not_found"));
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).get(0)
|
||||
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS).getFirst()
|
||||
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, String.valueOf(successCount));
|
||||
}
|
||||
|
||||
|
@ -752,7 +752,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
|
|||
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0);
|
||||
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 1);
|
||||
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES).get(0).getContent();
|
||||
final String errorResponses = runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES).getFirst().getContent();
|
||||
assertTrue(errorResponses.contains("not_found"));
|
||||
assertTrue(errorResponses.contains("For input string: 20abc"));
|
||||
assertTrue(errorResponses.contains("For input string: 213,456.9"));
|
||||
|
@ -856,7 +856,7 @@ public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest<Put
|
|||
msg.computeIfPresent("time", (key, val) -> Time.valueOf(LOCAL_TIME).getTime());
|
||||
msg.computeIfPresent("choice_ts", (key, val) -> Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli());
|
||||
});
|
||||
parsedJson.get(parsedJson.size() - 1).computeIfPresent("choice_ts", (key, val) -> "not-timestamp");
|
||||
parsedJson.getLast().computeIfPresent("choice_ts", (key, val) -> "not-timestamp");
|
||||
return JsonUtils.prettyPrint(parsedJson);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,15 +58,15 @@ public class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticse
|
|||
final TestRunner runner = createRunner(false);
|
||||
final TestElasticsearchClientService service = AbstractJsonQueryElasticsearchTest.getService(runner);
|
||||
service.setMaxPages(2);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue());
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType);
|
||||
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE, "1 sec");
|
||||
setQuery(runner, matchAllWithSortByMsgWithSizeQuery);
|
||||
|
||||
// first page
|
||||
runOnce(runner);
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number", "1");
|
||||
assertState(runner, paginationType, 10, 1);
|
||||
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
|
||||
assertFalse(getService(runner).getQuery().contains("\"five\""));
|
||||
|
@ -89,8 +89,8 @@ public class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticse
|
|||
// first page again (new query after first query expired)
|
||||
runOnce(runner);
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number", "1");
|
||||
assertState(runner, paginationType, 10, 1);
|
||||
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
|
||||
// trackingRangeValue should be retained after previous query expiry
|
||||
|
@ -101,8 +101,8 @@ public class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticse
|
|||
// second page
|
||||
runOnce(runner);
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number", "2");
|
||||
assertState(runner, paginationType, 20, 2);
|
||||
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
|
||||
assertTrue(getService(runner).getQuery().contains("\"five\""));
|
||||
|
@ -118,8 +118,8 @@ public class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticse
|
|||
|
||||
if (perResponseResultOutputStrategy && (iteration == 1 || iteration == 2)) {
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", String.valueOf(iteration));
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "10");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number", String.valueOf(iteration));
|
||||
assertState(runner, paginationType, expectedHitCount, iteration);
|
||||
} else if (perHitResultOutputStrategy && (iteration == 1 || iteration == 2)) {
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 10, 0, 0);
|
||||
|
@ -137,10 +137,10 @@ public class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticse
|
|||
}
|
||||
} else if (ResultOutputStrategy.PER_QUERY.equals(resultOutputStrategy)) {
|
||||
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20");
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count", "20");
|
||||
// the "last" page.number is used, so 2 here because there were 2 pages of hits
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2");
|
||||
assertEquals(20, runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length);
|
||||
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number", "2");
|
||||
assertEquals(20, runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().getContent().split("\n").length);
|
||||
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
|
||||
assertEquals("five", runner.getStateManager().getState(getStateScope()).get(ConsumeElasticsearch.STATE_RANGE_VALUE));
|
||||
} else {
|
||||
|
|
|
@ -28,7 +28,7 @@ public abstract class AbstractByQueryElasticsearch_IT extends AbstractElasticsea
|
|||
Assumptions.assumeFalse(runner.getProcessor() instanceof DeleteByQueryElasticsearch,
|
||||
"DeleteByQueryElasticsearch does not use the SCRIPT property");
|
||||
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.SCRIPT, "{\"source\": \"ctx._source.num++\", \"lang\": \"painless\"}");
|
||||
|
||||
assertQueryVerify(3, 0);
|
||||
|
|
|
@ -39,7 +39,7 @@ public abstract class AbstractElasticsearchRestProcessor_IT extends AbstractElas
|
|||
void testVerifyFullQueryInvalidJson() {
|
||||
Assumptions.assumeFalse(isConsumeElasticsearch(), "ConsumeElasticsearch does not use the FULL_QUERY Definition Type");
|
||||
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY, "{\"query\":");
|
||||
|
||||
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor()).verify(
|
||||
|
@ -67,7 +67,7 @@ public abstract class AbstractElasticsearchRestProcessor_IT extends AbstractElas
|
|||
void testVerifyFullQueryValid() {
|
||||
Assumptions.assumeFalse(isConsumeElasticsearch(), "ConsumeElasticsearch does not use the FULL_QUERY Definition Type");
|
||||
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY, "{\"query\":{\"term\":{\"msg\":\"one\"}}, \"aggs\":{\"messages\":{\"terms\":{\"field\":\"msg\"}}}}");
|
||||
|
||||
assertQueryVerify(1, 1);
|
||||
|
@ -77,7 +77,7 @@ public abstract class AbstractElasticsearchRestProcessor_IT extends AbstractElas
|
|||
void testVerifyFullQueryValidEmptyQuery() {
|
||||
Assumptions.assumeFalse(isConsumeElasticsearch(), "ConsumeElasticsearch does not use the FULL_QUERY Definition Type");
|
||||
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY);
|
||||
runner.removeProperty(ElasticsearchRestProcessor.QUERY); // should run a default "match_all" query
|
||||
|
||||
assertQueryVerify(3, 0);
|
||||
|
@ -87,7 +87,7 @@ public abstract class AbstractElasticsearchRestProcessor_IT extends AbstractElas
|
|||
void testVerifyFullQueryInvalid() {
|
||||
Assumptions.assumeFalse(isConsumeElasticsearch(), "ConsumeElasticsearch does not use the FULL_QUERY Definition Type");
|
||||
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY, "{\"query\":{\"unknown\":{}}}");
|
||||
|
||||
final List<ConfigVerificationResult> results = assertVerify(2);
|
||||
|
@ -106,7 +106,7 @@ public abstract class AbstractElasticsearchRestProcessor_IT extends AbstractElas
|
|||
void testVerifyBuildQueryValidQueryClause() {
|
||||
Assumptions.assumeFalse(isConsumeElasticsearch(), "ConsumeElasticsearch does not use the QUERY_CLAUSE");
|
||||
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_CLAUSE, "{\"term\":{\"msg\":\"one\"}}");
|
||||
|
||||
assertQueryVerify(1, 0);
|
||||
|
|
|
@ -31,10 +31,9 @@ import org.junit.jupiter.api.Test;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
||||
abstract class AbstractElasticsearch_IT<P extends ElasticsearchRestProcessor> extends AbstractElasticsearchITBase {
|
||||
static final List<String> TEST_INDICES = Collections.singletonList("messages");
|
||||
|
@ -54,7 +53,7 @@ abstract class AbstractElasticsearch_IT<P extends ElasticsearchRestProcessor> ex
|
|||
runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, elasticsearchHost);
|
||||
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
|
||||
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
|
||||
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue());
|
||||
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS);
|
||||
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
|
||||
runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);
|
||||
|
||||
|
@ -102,12 +101,12 @@ abstract class AbstractElasticsearch_IT<P extends ElasticsearchRestProcessor> ex
|
|||
private void assertIndexVerificationResults(final List<ConfigVerificationResult> results, final boolean expectedExists, final String expectedExplanation)
|
||||
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
|
||||
// allow for extra verification test results beyond Index Exist
|
||||
assertTrue(results.size() >= 1);
|
||||
assertFalse(results.isEmpty());
|
||||
final List<ConfigVerificationResult> indexResults = results.stream()
|
||||
.filter(result -> ElasticsearchRestProcessor.VERIFICATION_STEP_INDEX_EXISTS.equals(result.getVerificationStepName()))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
assertEquals(1, indexResults.size(), results.toString());
|
||||
final ConfigVerificationResult result = indexResults.get(0);
|
||||
final ConfigVerificationResult result = indexResults.getFirst();
|
||||
|
||||
final ConfigVerificationResult.Outcome expectedOutcome;
|
||||
if (getProcessor().isIndexNotExistSuccessful()) {
|
||||
|
|
|
@ -25,7 +25,7 @@ public abstract class AbstractJsonQueryElasticsearch_IT extends AbstractElastics
|
|||
@BeforeEach
|
||||
public void setUp() {
|
||||
// set Query Definition Style and default Query Clause for all tests, allowing for ConsumeElasticsearch test override
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY.getValue());
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY);
|
||||
runner.setProperty(ElasticsearchRestProcessor.QUERY_CLAUSE, "{\"match_all\":{}}");
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ class ConsumeElasticsearch_IT extends AbstractJsonQueryElasticsearch_IT {
|
|||
private static final String RANGE_SORT_ORDER = "asc";
|
||||
|
||||
ElasticsearchRestProcessor getProcessor() {
|
||||
return new TestConsumeElasticsearch();
|
||||
return new ConsumeElasticsearch();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
|
|
|
@ -1,29 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License") you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch.integration;
|
||||
|
||||
import org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch;
|
||||
|
||||
public class TestConsumeElasticsearch extends ConsumeElasticsearch {
|
||||
void setTrackingRangeField(final String trackingRangeField) {
|
||||
super.trackingRangeField = trackingRangeField;
|
||||
}
|
||||
|
||||
void setTrackingSortOrder(final String trackingSortOrder) {
|
||||
super.trackingSortOrder = trackingSortOrder;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue