mirror of https://github.com/apache/nifi.git
NIFI-14073 Added PMD Plugin to contrib-check Profile
- Added standard PMD rules scoped to current project conventions - Updated multiple classes to meet UseDiamondOperator rule requirements - Applied selected SuppressWarnings to selected classes - Streamlined ci-workflow build to avoid unnecessary exclusions - Refactored assembly module phases to align with standard lifecycle behavior Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #9581.
This commit is contained in:
parent
e4cb5a3615
commit
43faa73b4e
|
@ -23,7 +23,6 @@ env:
|
|||
DEFAULT_MAVEN_OPTS: >-
|
||||
-Xmx4g
|
||||
-XX:ReservedCodeCacheSize=1g
|
||||
-XX:+UseG1GC
|
||||
-Dorg.slf4j.simpleLogger.defaultLogLevel=WARN
|
||||
-Daether.connector.http.retryHandler.count=5
|
||||
-Daether.connector.http.connectionMaxTtl=30
|
||||
|
@ -37,25 +36,6 @@ env:
|
|||
--no-snapshot-updates
|
||||
--no-transfer-progress
|
||||
--fail-fast
|
||||
-pl -:minifi-integration-tests
|
||||
-pl -:minifi-assembly
|
||||
-pl -:nifi-assembly
|
||||
-pl -:nifi-toolkit-assembly
|
||||
-pl -:nifi-registry-assembly
|
||||
-pl -:nifi-registry-toolkit-assembly
|
||||
-pl -:nifi-runtime-manifest
|
||||
-pl -:nifi-runtime-manifest-test
|
||||
-pl -:nifi-stateless-assembly
|
||||
-pl -:nifi-stateless-system-test-suite
|
||||
-pl -:nifi-system-test-suite
|
||||
-pl -:nifi-nar-provider-assembly
|
||||
-pl -:nifi-py4j-integration-tests
|
||||
-pl -:nifi-server-nar
|
||||
-pl -:nifi-ui
|
||||
-pl -:nifi-jolt-nar
|
||||
-pl -:nifi-jolt-transform-json-ui
|
||||
-pl -:nifi-standard-content-viewer-nar
|
||||
-pl -:nifi-standard-content-viewer
|
||||
MAVEN_VERIFY_COMMAND: >-
|
||||
verify
|
||||
--show-version
|
||||
|
@ -63,15 +43,6 @@ env:
|
|||
--no-transfer-progress
|
||||
--fail-fast
|
||||
-D dir-only
|
||||
MAVEN_BUILD_PROFILES: >-
|
||||
-P skip-nifi-bin-assembly
|
||||
MAVEN_PROJECTS: >-
|
||||
-pl -minifi/minifi-assembly
|
||||
-pl -minifi/minifi-toolkit/minifi-toolkit-assembly
|
||||
-pl -nifi-registry/nifi-registry-assembly
|
||||
-pl -nifi-registry/nifi-registry-toolkit/nifi-registry-toolkit-assembly
|
||||
-pl -nifi-stateless/nifi-stateless-assembly
|
||||
-pl -nifi-toolkit/nifi-toolkit-assembly
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
|
@ -124,8 +95,10 @@ jobs:
|
|||
env:
|
||||
MAVEN_OPTS: >-
|
||||
${{ env.COMPILE_MAVEN_OPTS }}
|
||||
# Run PMD Check with compile phase to resolve modules
|
||||
run: >
|
||||
${{ env.MAVEN_COMMAND }}
|
||||
pmd:check
|
||||
${{ env.MAVEN_COMPILE_COMMAND }}
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@v3
|
||||
|
@ -189,9 +162,7 @@ jobs:
|
|||
run: >
|
||||
${{ env.MAVEN_COMMAND }}
|
||||
${{ env.MAVEN_VERIFY_COMMAND }}
|
||||
${{ env.MAVEN_BUILD_PROFILES }}
|
||||
-P python-unit-tests
|
||||
${{ env.MAVEN_PROJECTS }}
|
||||
- name: Upload Test Reports
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
|
@ -266,9 +237,7 @@ jobs:
|
|||
run: >-
|
||||
${{ env.MAVEN_COMMAND }}
|
||||
${{ env.MAVEN_VERIFY_COMMAND }}
|
||||
${{ env.MAVEN_BUILD_PROFILES }}
|
||||
-P python-unit-tests
|
||||
${{ env.MAVEN_PROJECTS }}
|
||||
- name: Upload Test Reports
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
|
@ -341,8 +310,6 @@ jobs:
|
|||
run: >-
|
||||
${{ env.MAVEN_COMMAND_WINDOWS }}
|
||||
${{ env.MAVEN_VERIFY_COMMAND }}
|
||||
${{ env.MAVEN_BUILD_PROFILES }}
|
||||
${{ env.MAVEN_PROJECTS }}
|
||||
- name: Upload Test Reports
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
|
|
|
@ -61,6 +61,7 @@ env:
|
|||
install
|
||||
MAVEN_BUILD_PROJECTS: >-
|
||||
-pl nifi-assembly
|
||||
-pl nifi-registry/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly
|
||||
-pl nifi-registry/nifi-registry-assembly
|
||||
-pl nifi-toolkit/nifi-toolkit-assembly
|
||||
-pl minifi/minifi-assembly
|
||||
|
@ -70,6 +71,7 @@ env:
|
|||
clean
|
||||
MAVEN_CLEAN_PROJECTS: >-
|
||||
-pl -nifi-assembly
|
||||
-pl -nifi-registry/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly
|
||||
-pl -nifi-registry/nifi-registry-assembly
|
||||
-pl -nifi-toolkit/nifi-toolkit-assembly
|
||||
-pl -nifi-toolkit/nifi-toolkit-cli
|
||||
|
|
|
@ -34,6 +34,7 @@ public class PropertyDescriptor implements Serializable {
|
|||
private boolean required;
|
||||
private boolean sensitive;
|
||||
private ExpressionLanguageScope expressionLanguageScope = ExpressionLanguageScope.NONE;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private String expressionLanguageScopeDescription = ExpressionLanguageScope.NONE.getDescription();
|
||||
private DefinedType typeProvidedByValue;
|
||||
private String validRegex;
|
||||
|
|
|
@ -32,7 +32,7 @@ public enum SensitiveProperty {
|
|||
NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWORD("nifi.minifi.security.truststorePasswd"),
|
||||
NIFI_MINIFI_SENSITIVE_PROPS_KEY("nifi.minifi.sensitive.props.key");
|
||||
|
||||
public static final Set<String> SENSITIVE_PROPERTIES = Arrays.stream(SensitiveProperty.values()).map(SensitiveProperty::getKey)
|
||||
public static final Set<String> SENSITIVE_PROPERTIES = Arrays.stream(values()).map(SensitiveProperty::getKey)
|
||||
.collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
|
||||
|
||||
private final String key;
|
||||
|
|
|
@ -139,7 +139,7 @@ public enum MiNiFiProperties {
|
|||
}
|
||||
|
||||
public static LinkedHashMap<String, MiNiFiProperties> sortedPropertiesByKey() {
|
||||
return Arrays.stream(MiNiFiProperties.values())
|
||||
return Arrays.stream(values())
|
||||
.sorted()
|
||||
.collect(Collectors.toMap(MiNiFiProperties::getKey, Function.identity(), (x, y) -> y, LinkedHashMap::new));
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
<executions>
|
||||
<execution>
|
||||
<id>copy-markdown</id>
|
||||
<phase>generate-resources</phase>
|
||||
<phase>process-resources</phase>
|
||||
<goals>
|
||||
<goal>copy-resources</goal>
|
||||
</goals>
|
||||
|
@ -57,7 +57,7 @@
|
|||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<phase>package</phase>
|
||||
<phase>process-resources</phase>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/dependencies.xml</descriptor>
|
||||
|
|
|
@ -31,7 +31,6 @@ import static java.nio.file.StandardOpenOption.SYNC;
|
|||
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
|
||||
import static java.nio.file.StandardOpenOption.WRITE;
|
||||
import static java.util.Optional.empty;
|
||||
import static java.util.Optional.of;
|
||||
import static java.util.Optional.ofNullable;
|
||||
import static java.util.function.Predicate.not;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
@ -115,7 +114,7 @@ public class FileResourceRepository implements ResourceRepository {
|
|||
newItems.add(resourceItem);
|
||||
ResourceRepositoryDescriptor newRepositoryDescriptor = new ResourceRepositoryDescriptor(resourceRepositoryDescriptor.resourcesGlobalHash(), newItems);
|
||||
persist(newRepositoryDescriptor);
|
||||
return of(resourceItem);
|
||||
return Optional.of(resourceItem);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to persist repository metadata", e);
|
||||
return empty();
|
||||
|
|
|
@ -38,7 +38,7 @@ limitations under the License.
|
|||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<phase>package</phase>
|
||||
<phase>process-resources</phase>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/dependencies.xml</descriptor>
|
||||
|
|
|
@ -61,9 +61,9 @@ public class BaseSchemaWithId extends BaseSchema implements WritableSchema {
|
|||
public List<String> getValidationIssues() {
|
||||
List<String> validationIssues = super.getValidationIssues();
|
||||
if (StringUtil.isNullOrEmpty(id)) {
|
||||
validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
|
||||
validationIssues.add(getIssueText(ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
|
||||
} else if (!isValidId(id)) {
|
||||
validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), "Id value of " + id + " is not a valid UUID"));
|
||||
validationIssues.add(getIssueText(ID_KEY, getWrapperName(), "Id value of " + id + " is not a valid UUID"));
|
||||
}
|
||||
return validationIssues;
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ language governing permissions and limitations under the License. -->
|
|||
<goals>
|
||||
<goal>unpack-dependencies</goal>
|
||||
</goals>
|
||||
<phase>generate-resources</phase>
|
||||
<phase>prepare-package</phase>
|
||||
<configuration>
|
||||
<includeTypes>nar</includeTypes>
|
||||
<includes>**/docs/**</includes>
|
||||
|
@ -1133,27 +1133,6 @@ language governing permissions and limitations under the License. -->
|
|||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>skip-nifi-bin-assembly</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>skip-nifi-bin-assembly</name>
|
||||
</property>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make shared resource</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>include-all</id>
|
||||
<properties>
|
||||
|
|
|
@ -66,7 +66,7 @@ public class Query {
|
|||
}
|
||||
|
||||
public static ResultType getResultType(final String value) throws AttributeExpressionLanguageParsingException {
|
||||
return Query.compile(value).getResultType();
|
||||
return compile(value).getResultType();
|
||||
}
|
||||
|
||||
public static List<ResultType> extractResultTypes(final String value) throws AttributeExpressionLanguageParsingException {
|
||||
|
@ -183,7 +183,7 @@ public class Query {
|
|||
|
||||
final Range range = ranges.get(0);
|
||||
final String expression = value.substring(range.getStart(), range.getEnd() + 1);
|
||||
Query.compile(expression);
|
||||
compile(expression);
|
||||
|
||||
if (range.getStart() > 0 || range.getEnd() < value.length() - 1) {
|
||||
throw new AttributeExpressionLanguageParsingException("Found characters outside of Expression");
|
||||
|
@ -191,7 +191,7 @@ public class Query {
|
|||
} else {
|
||||
for (final Range range : extractExpressionRanges(value)) {
|
||||
final String expression = value.substring(range.getStart(), range.getEnd() + 1);
|
||||
Query.compile(expression);
|
||||
compile(expression);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -231,7 +231,7 @@ public class Query {
|
|||
|
||||
static String evaluateExpressions(final String rawValue, Map<String, String> expressionMap, final AttributeValueDecorator decorator, final Map<String, String> stateVariables,
|
||||
final ParameterLookup parameterLookup) throws ProcessException {
|
||||
return Query.prepare(rawValue).evaluateExpressions(new StandardEvaluationContext(expressionMap, stateVariables, parameterLookup), decorator);
|
||||
return prepare(rawValue).evaluateExpressions(new StandardEvaluationContext(expressionMap, stateVariables, parameterLookup), decorator);
|
||||
}
|
||||
|
||||
static String evaluateExpressions(final String rawValue, final Map<String, String> valueLookup, final ParameterLookup parameterLookup) throws ProcessException {
|
||||
|
@ -240,7 +240,7 @@ public class Query {
|
|||
|
||||
static String evaluateExpressions(final String rawValue, final Map<String, String> valueLookup, final AttributeValueDecorator decorator, final ParameterLookup parameterLookup)
|
||||
throws ProcessException {
|
||||
return Query.prepare(rawValue).evaluateExpressions(new StandardEvaluationContext(valueLookup, Collections.emptyMap(), parameterLookup), decorator);
|
||||
return prepare(rawValue).evaluateExpressions(new StandardEvaluationContext(valueLookup, Collections.emptyMap(), parameterLookup), decorator);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -59,12 +59,12 @@ final class ValueLookup implements Map<String, String> {
|
|||
}
|
||||
|
||||
if (flowFile != null) {
|
||||
maps.add(ValueLookup.extractFlowFileProperties(flowFile));
|
||||
maps.add(extractFlowFileProperties(flowFile));
|
||||
maps.add(flowFile.getAttributes());
|
||||
}
|
||||
}
|
||||
|
||||
static final Map<String, String> extractFlowFileProperties(final FlowFile flowFile) {
|
||||
static Map<String, String> extractFlowFileProperties(final FlowFile flowFile) {
|
||||
final Map<String, String> flowFileProps = new HashMap<>();
|
||||
flowFileProps.put("flowFileId", String.valueOf(flowFile.getId()));
|
||||
flowFileProps.put("fileSize", String.valueOf(flowFile.getSize()));
|
||||
|
|
|
@ -42,7 +42,7 @@ public class JsonPathDeleteEvaluator extends JsonPathBaseEvaluator {
|
|||
public QueryResult<String> evaluate(EvaluationContext context) {
|
||||
DocumentContext documentContext = getDocumentContext(context);
|
||||
|
||||
final JsonPath compiledJsonPath = getJsonPath(context);;
|
||||
final JsonPath compiledJsonPath = getJsonPath(context);
|
||||
|
||||
String result = null;
|
||||
try {
|
||||
|
|
|
@ -30,6 +30,7 @@ public class FlowFilePackagerV1 implements FlowFilePackager {
|
|||
|
||||
public static final String FILENAME_ATTRIBUTES = "flowfile.attributes";
|
||||
public static final String FILENAME_CONTENT = "flowfile.content";
|
||||
@SuppressWarnings("PMD.AvoidUsingOctalValues")
|
||||
public static final int DEFAULT_TAR_PERMISSIONS = 0644;
|
||||
|
||||
private final int tarPermissions;
|
||||
|
|
|
@ -41,7 +41,7 @@ public class HashiCorpVaultProperties {
|
|||
private final int kvVersion;
|
||||
|
||||
private HashiCorpVaultProperties(final HashiCorpVaultPropertiesBuilder builder) {
|
||||
this.uri = Objects.requireNonNull(builder.uri, "Vault URI is required");;
|
||||
this.uri = Objects.requireNonNull(builder.uri, "Vault URI is required");
|
||||
this.authPropertiesFilename = Objects.requireNonNull(builder.authPropertiesFilename, "Vault auth properties filename is required");
|
||||
this.ssl = new HashiCorpVaultSslProperties(builder.keyStore, builder.keyStoreType, builder.keyStorePassword,
|
||||
builder.trustStore, builder.trustStoreType, builder.trustStorePassword, builder.enabledTlsCipherSuites, builder.enabledTlsProtocols);
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.nifi.hl7.query.evaluator.Evaluator;
|
|||
public class MessageEvaluator implements Evaluator<HL7Message> {
|
||||
|
||||
public HL7Message evaluate(final Map<String, Object> objectMap) {
|
||||
return (HL7Message) objectMap.get(Evaluator.MESSAGE_KEY);
|
||||
return (HL7Message) objectMap.get(MESSAGE_KEY);
|
||||
}
|
||||
|
||||
public Class<? extends HL7Message> getType() {
|
||||
|
|
|
@ -40,7 +40,7 @@ public class SegmentEvaluator implements Evaluator<List> {
|
|||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
final HL7Message message = (HL7Message) objectMap.get(Evaluator.MESSAGE_KEY);
|
||||
final HL7Message message = (HL7Message) objectMap.get(MESSAGE_KEY);
|
||||
final List<HL7Segment> segments = message.getSegments(segmentType);
|
||||
return (segments == null) ? Collections.<HL7Segment>emptyList() : segments;
|
||||
}
|
||||
|
|
|
@ -564,13 +564,13 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
|
||||
final String propertyKey;
|
||||
if (isSiteToSiteSecure()) {
|
||||
if (StringUtils.isBlank(getProperty(NiFiProperties.WEB_HTTPS_PORT_FORWARDING))) {
|
||||
if (StringUtils.isBlank(getProperty(WEB_HTTPS_PORT_FORWARDING))) {
|
||||
propertyKey = WEB_HTTPS_PORT;
|
||||
} else {
|
||||
propertyKey = WEB_HTTPS_PORT_FORWARDING;
|
||||
}
|
||||
} else {
|
||||
if (StringUtils.isBlank(getProperty(NiFiProperties.WEB_HTTP_PORT_FORWARDING))) {
|
||||
if (StringUtils.isBlank(getProperty(WEB_HTTP_PORT_FORWARDING))) {
|
||||
propertyKey = WEB_HTTP_PORT;
|
||||
} else {
|
||||
propertyKey = WEB_HTTP_PORT_FORWARDING;
|
||||
|
@ -968,7 +968,7 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
* @return true if the login identity provider has been configured
|
||||
*/
|
||||
public boolean isLoginIdentityProviderEnabled() {
|
||||
return !StringUtils.isBlank(getProperty(NiFiProperties.SECURITY_USER_LOGIN_IDENTITY_PROVIDER));
|
||||
return !StringUtils.isBlank(getProperty(SECURITY_USER_LOGIN_IDENTITY_PROVIDER));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1547,7 +1547,7 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
final String clientSecure = getProperty(ZOOKEEPER_CLIENT_SECURE, defaultValue).trim();
|
||||
|
||||
if (!"true".equalsIgnoreCase(clientSecure) && !"false".equalsIgnoreCase(clientSecure)) {
|
||||
throw new RuntimeException(String.format("%s was '%s', expected true or false", NiFiProperties.ZOOKEEPER_CLIENT_SECURE, clientSecure));
|
||||
throw new RuntimeException(String.format("%s was '%s', expected true or false", ZOOKEEPER_CLIENT_SECURE, clientSecure));
|
||||
}
|
||||
|
||||
return Boolean.parseBoolean(clientSecure);
|
||||
|
@ -1558,18 +1558,18 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
final String withEnsembleTracker = getProperty(ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER, defaultValue).trim();
|
||||
|
||||
if (!"true".equalsIgnoreCase(withEnsembleTracker) && !"false".equalsIgnoreCase(withEnsembleTracker)) {
|
||||
throw new RuntimeException(String.format("%s was '%s', expected true or false", NiFiProperties.ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER, withEnsembleTracker));
|
||||
throw new RuntimeException(String.format("%s was '%s', expected true or false", ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER, withEnsembleTracker));
|
||||
}
|
||||
|
||||
return Boolean.parseBoolean(withEnsembleTracker);
|
||||
}
|
||||
|
||||
public boolean isZooKeeperTlsConfigurationPresent() {
|
||||
return StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE))
|
||||
&& StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE))
|
||||
&& getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD) != null
|
||||
&& StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE))
|
||||
&& getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD) != null;
|
||||
return StringUtils.isNotBlank(getProperty(ZOOKEEPER_CLIENT_SECURE))
|
||||
&& StringUtils.isNotBlank(getProperty(ZOOKEEPER_SECURITY_KEYSTORE))
|
||||
&& getProperty(ZOOKEEPER_SECURITY_KEYSTORE_PASSWD) != null
|
||||
&& StringUtils.isNotBlank(getProperty(ZOOKEEPER_SECURITY_TRUSTSTORE))
|
||||
&& getProperty(ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD) != null;
|
||||
}
|
||||
|
||||
public boolean isTlsConfigurationPresent() {
|
||||
|
@ -1835,7 +1835,7 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
|
||||
private static void readFromPropertiesFile(String propertiesFilePath, Properties properties) {
|
||||
final String nfPropertiesFilePath = (propertiesFilePath == null)
|
||||
? System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH)
|
||||
? System.getProperty(PROPERTIES_FILE_PATH)
|
||||
: propertiesFilePath;
|
||||
if (nfPropertiesFilePath != null) {
|
||||
final File propertiesFile = new File(nfPropertiesFilePath.trim());
|
||||
|
|
|
@ -506,7 +506,7 @@ public class StringUtils {
|
|||
}
|
||||
List<String> elements = Arrays.asList(input.trim().toLowerCase().split("\\s"));
|
||||
return elements.stream()
|
||||
.filter(word -> !StringUtils.isBlank(word))
|
||||
.filter(word -> !isBlank(word))
|
||||
.map(word -> Character.toTitleCase(word.charAt(0)) + word.substring(1))
|
||||
.collect(Collectors.joining(" "));
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ public class RecordPathCompiler {
|
|||
RecordPathSegment parent = root;
|
||||
for (int i = 0; i < pathTree.getChildCount(); i++) {
|
||||
final Tree child = pathTree.getChild(i);
|
||||
parent = RecordPathCompiler.buildPath(child, parent, absolute);
|
||||
parent = buildPath(child, parent, absolute);
|
||||
}
|
||||
|
||||
// If the given path tree is an operator, create a Filter Function that will be responsible for returning true/false based on the provided operation
|
||||
|
|
|
@ -648,7 +648,7 @@ public class MapRecord implements Record {
|
|||
|
||||
Object mapObject = values.get(recordField.getFieldName());
|
||||
if (mapObject == null) {
|
||||
mapObject = new LinkedHashMap<String, Object>();
|
||||
mapObject = new LinkedHashMap<>();
|
||||
}
|
||||
if (!(mapObject instanceof Map)) {
|
||||
return;
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.nifi.remote.codec.FlowFileCodec;
|
|||
import org.apache.nifi.remote.exception.ProtocolException;
|
||||
import org.apache.nifi.remote.io.CompressionInputStream;
|
||||
import org.apache.nifi.remote.io.CompressionOutputStream;
|
||||
import org.apache.nifi.remote.protocol.CommunicationsSession;
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
import org.apache.nifi.remote.protocol.Response;
|
||||
import org.apache.nifi.remote.protocol.ResponseCode;
|
||||
|
@ -193,7 +192,6 @@ public abstract class AbstractTransaction implements Transaction {
|
|||
+ "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED);
|
||||
}
|
||||
|
||||
final CommunicationsSession commsSession = peer.getCommunicationsSession();
|
||||
if (direction == TransferDirection.RECEIVE) {
|
||||
if (dataAvailable) {
|
||||
throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
|
||||
|
|
|
@ -143,6 +143,7 @@ public class PeerSelector {
|
|||
* @param peerCount the number of peers in the remote instance
|
||||
* @return the normalized weight of this peer
|
||||
*/
|
||||
@SuppressWarnings("PMD.AvoidDecimalLiteralsInBigDecimalConstructor")
|
||||
private static double calculateNormalizedWeight(TransferDirection direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
|
||||
// If there is only a single remote, send/receive all data to/from it
|
||||
if (peerCount == 1) {
|
||||
|
|
|
@ -528,7 +528,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "EndpointConnectionPool[Cluster URL=" + siteInfoProvider.getClusterUrls() + "]";
|
||||
return "EndpointConnectionPool[Cluster URL=" + siteInfoProvider.getClusterUrls() + " LocalAddress=" + localAddress + "]";
|
||||
}
|
||||
|
||||
private class IdEnrichedRemoteDestination implements RemoteDestination {
|
||||
|
|
|
@ -58,7 +58,7 @@ public enum ResponseCode {
|
|||
private static final ResponseCode[] codeArray = new ResponseCode[256];
|
||||
|
||||
static {
|
||||
for (final ResponseCode responseCode : ResponseCode.values()) {
|
||||
for (final ResponseCode responseCode : values()) {
|
||||
codeArray[responseCode.getCode()] = responseCode;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,7 +126,6 @@ import org.apache.nifi.web.api.entity.TransactionResultEntity;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isEmpty;
|
||||
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
|
||||
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
|
||||
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
|
||||
|
@ -258,7 +257,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
private void setupCredentialsProvider() {
|
||||
credentialsProvider = new BasicCredentialsProvider();
|
||||
if (proxy != null) {
|
||||
if (!isEmpty(proxy.getUsername()) && !isEmpty(proxy.getPassword())) {
|
||||
if (StringUtils.isNotEmpty(proxy.getUsername()) && StringUtils.isNotEmpty(proxy.getPassword())) {
|
||||
credentialsProvider.setCredentials(
|
||||
new AuthScope(proxy.getHttpHost()),
|
||||
new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword()));
|
||||
|
@ -477,7 +476,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
EntityUtils.consume(response.getEntity());
|
||||
|
||||
transactionUrl = readTransactionUrl(response);
|
||||
if (isEmpty(transactionUrl)) {
|
||||
if (StringUtils.isEmpty(transactionUrl)) {
|
||||
throw new ProtocolException("Server returned RESPONSE_CODE_CREATED without Location header");
|
||||
}
|
||||
final Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
|
||||
|
@ -680,7 +679,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
|
||||
private boolean shouldCheckProxyAuth() {
|
||||
return proxy != null && !isEmpty(proxy.getUsername());
|
||||
return proxy != null && StringUtils.isNotEmpty(proxy.getUsername());
|
||||
}
|
||||
|
||||
public boolean openConnectionForReceive(final String transactionUrl, final Peer peer) throws IOException {
|
||||
|
@ -1199,7 +1198,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
|
||||
public String getDescription() {
|
||||
return !isEmpty(explanation) ? explanation : responseMessage;
|
||||
return StringUtils.isNotEmpty(explanation) ? explanation : responseMessage;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ public class FileUtils {
|
|||
* @return true if given file no longer exists
|
||||
*/
|
||||
public static boolean deleteFile(final File file, final Logger logger) {
|
||||
return FileUtils.deleteFile(file, logger, 1);
|
||||
return deleteFile(file, logger, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -120,7 +120,7 @@ public class FileUtils {
|
|||
for (int i = 0; i < effectiveAttempts && !isGone; i++) {
|
||||
isGone = file.delete() || !file.exists();
|
||||
if (!isGone && (effectiveAttempts - i) > 1) {
|
||||
FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
|
||||
sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
|
||||
}
|
||||
}
|
||||
if (!isGone && logger != null) {
|
||||
|
@ -142,7 +142,7 @@ public class FileUtils {
|
|||
* @param logger can be null
|
||||
*/
|
||||
public static void deleteFile(final List<File> files, final Logger logger) {
|
||||
FileUtils.deleteFile(files, logger, 1);
|
||||
deleteFile(files, logger, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -163,7 +163,7 @@ public class FileUtils {
|
|||
for (int i = 0; i < effectiveAttempts && !isGone; i++) {
|
||||
isGone = file.delete() || !file.exists();
|
||||
if (!isGone && (effectiveAttempts - i) > 1) {
|
||||
FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
|
||||
sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
|
||||
}
|
||||
}
|
||||
if (!isGone && logger != null) {
|
||||
|
@ -186,7 +186,7 @@ public class FileUtils {
|
|||
* @param logger the logger to use
|
||||
*/
|
||||
public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) {
|
||||
FileUtils.deleteFilesInDir(directory, filter, logger, false);
|
||||
deleteFilesInDir(directory, filter, logger, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -198,7 +198,7 @@ public class FileUtils {
|
|||
* @param recurse indicates whether to recurse subdirectories
|
||||
*/
|
||||
public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) {
|
||||
FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
|
||||
deleteFilesInDir(directory, filter, logger, recurse, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -217,12 +217,12 @@ public class FileUtils {
|
|||
for (File ingestFile : ingestFiles) {
|
||||
boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName());
|
||||
if (ingestFile.isFile() && process) {
|
||||
FileUtils.deleteFile(ingestFile, logger, 3);
|
||||
deleteFile(ingestFile, logger, 3);
|
||||
}
|
||||
if (ingestFile.isDirectory() && recurse) {
|
||||
FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
|
||||
deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
|
||||
if (deleteEmptyDirectories && ingestFile.list().length == 0) {
|
||||
FileUtils.deleteFile(ingestFile, logger, 3);
|
||||
deleteFile(ingestFile, logger, 3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -238,16 +238,16 @@ public class FileUtils {
|
|||
*/
|
||||
public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException {
|
||||
for (final File file : files) {
|
||||
FileUtils.deleteFile(file, recurse);
|
||||
deleteFile(file, recurse);
|
||||
}
|
||||
}
|
||||
|
||||
public static void deleteFile(final File file, final boolean recurse) throws IOException {
|
||||
if (file.isDirectory() && recurse) {
|
||||
FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse);
|
||||
deleteFiles(Arrays.asList(file.listFiles()), recurse);
|
||||
}
|
||||
//now delete the file itself regardless of whether it is plain file or a directory
|
||||
if (!FileUtils.deleteFile(file, null, 5)) {
|
||||
if (!deleteFile(file, null, 5)) {
|
||||
Files.delete(file.toPath());
|
||||
}
|
||||
}
|
||||
|
@ -286,12 +286,12 @@ public class FileUtils {
|
|||
fos.flush();
|
||||
fos.close();
|
||||
// Try to delete the file a few times
|
||||
if (!FileUtils.deleteFile(file, null, 5)) {
|
||||
if (!deleteFile(file, null, 5)) {
|
||||
throw new IOException("Failed to delete file after shredding");
|
||||
}
|
||||
|
||||
} finally {
|
||||
FileUtils.closeQuietly(fos);
|
||||
closeQuietly(fos);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -324,8 +324,8 @@ public class FileUtils {
|
|||
fos.flush();
|
||||
fileSize = bytes.length;
|
||||
} finally {
|
||||
FileUtils.releaseQuietly(outLock);
|
||||
FileUtils.closeQuietly(fos);
|
||||
releaseQuietly(outLock);
|
||||
closeQuietly(fos);
|
||||
}
|
||||
return fileSize;
|
||||
}
|
||||
|
@ -383,23 +383,23 @@ public class FileUtils {
|
|||
fileSize = in.size();
|
||||
} while (bytesWritten < fileSize);
|
||||
out.force(false);
|
||||
FileUtils.closeQuietly(fos);
|
||||
FileUtils.closeQuietly(fis);
|
||||
closeQuietly(fos);
|
||||
closeQuietly(fis);
|
||||
fos = null;
|
||||
fis = null;
|
||||
if (move && !FileUtils.deleteFile(source, null, 5)) {
|
||||
if (move && !deleteFile(source, null, 5)) {
|
||||
if (logger == null) {
|
||||
FileUtils.deleteFile(destination, null, 5);
|
||||
deleteFile(destination, null, 5);
|
||||
throw new IOException("Could not remove file " + source.getAbsolutePath());
|
||||
} else {
|
||||
logger.warn("Configured to delete source file when renaming/move not successful. However, unable to delete file at: {}", source.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
FileUtils.releaseQuietly(inLock);
|
||||
FileUtils.releaseQuietly(outLock);
|
||||
FileUtils.closeQuietly(fos);
|
||||
FileUtils.closeQuietly(fis);
|
||||
releaseQuietly(inLock);
|
||||
releaseQuietly(outLock);
|
||||
closeQuietly(fos);
|
||||
closeQuietly(fis);
|
||||
}
|
||||
}
|
||||
return fileSize;
|
||||
|
@ -419,7 +419,7 @@ public class FileUtils {
|
|||
* @throws SecurityException if a security manager denies the needed file operations
|
||||
*/
|
||||
public static long copyFile(final File source, final File destination, final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) throws FileNotFoundException, IOException {
|
||||
return FileUtils.copyFile(source, destination, lockInputFile, lockOutputFile, false, logger);
|
||||
return copyFile(source, destination, lockInputFile, lockOutputFile, false, logger);
|
||||
}
|
||||
|
||||
public static long copyFile(final File source, final OutputStream stream, final boolean closeOutputStream, final boolean lockInputFile) throws FileNotFoundException, IOException {
|
||||
|
@ -446,10 +446,10 @@ public class FileUtils {
|
|||
stream.flush();
|
||||
fileSize = in.size();
|
||||
} finally {
|
||||
FileUtils.releaseQuietly(inLock);
|
||||
FileUtils.closeQuietly(fis);
|
||||
releaseQuietly(inLock);
|
||||
closeQuietly(fis);
|
||||
if (closeOutputStream) {
|
||||
FileUtils.closeQuietly(stream);
|
||||
closeQuietly(stream);
|
||||
}
|
||||
}
|
||||
return fileSize;
|
||||
|
@ -474,7 +474,7 @@ public class FileUtils {
|
|||
* @throws IOException if rename isn't successful
|
||||
*/
|
||||
public static void renameFile(final File source, final File destination, final int maxAttempts) throws IOException {
|
||||
FileUtils.renameFile(source, destination, maxAttempts, false);
|
||||
renameFile(source, destination, maxAttempts, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -494,7 +494,7 @@ public class FileUtils {
|
|||
for (int i = 0; i < attempts; i++) {
|
||||
renamed = source.renameTo(destination);
|
||||
if (!renamed) {
|
||||
FileUtils.deleteFile(destination, null, 5);
|
||||
deleteFile(destination, null, 5);
|
||||
} else {
|
||||
break; //rename has succeeded
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ public class AhoCorasick<T> implements Search<T> {
|
|||
if (root == null) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
final SearchState<T> currentState = (state == null) ? new SearchState(root) : state;
|
||||
final SearchState<T> currentState = (state == null) ? new SearchState<>(root) : state;
|
||||
if (!findAll && currentState.foundMatch()) {
|
||||
throw new IllegalStateException("A match has already been found yet we're being asked to keep searching");
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@
|
|||
<executions>
|
||||
<execution>
|
||||
<id>output-html</id>
|
||||
<phase>prepare-package</phase>
|
||||
<phase>process-resources</phase>
|
||||
<goals>
|
||||
<goal>process-asciidoc</goal>
|
||||
</goals>
|
||||
|
@ -83,7 +83,7 @@
|
|||
<version>1.5.3</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>replace</goal>
|
||||
</goals>
|
||||
|
@ -139,7 +139,7 @@
|
|||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<phase>package</phase>
|
||||
<phase>compile</phase>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/dependencies.xml</descriptor>
|
||||
|
|
|
@ -235,19 +235,19 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, String headersFormat, String headerAttributePrefix, boolean removeCurlyBraces,
|
||||
String valueSeparatorForHeaders) {
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, properties.getAppId());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, properties.getContentEncoding());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getContentType());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, properties.getDeliveryMode());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, properties.getPriority());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, properties.getCorrelationId());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, properties.getReplyTo());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, properties.getExpiration());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, properties.getMessageId());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getType());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, properties.getUserId());
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, properties.getClusterId());
|
||||
addAttribute(attributes, AMQP_APPID_ATTRIBUTE, properties.getAppId());
|
||||
addAttribute(attributes, AMQP_CONTENT_ENCODING_ATTRIBUTE, properties.getContentEncoding());
|
||||
addAttribute(attributes, AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getContentType());
|
||||
addAttribute(attributes, AMQP_DELIVERY_MODE_ATTRIBUTE, properties.getDeliveryMode());
|
||||
addAttribute(attributes, AMQP_PRIORITY_ATTRIBUTE, properties.getPriority());
|
||||
addAttribute(attributes, AMQP_CORRELATION_ID_ATTRIBUTE, properties.getCorrelationId());
|
||||
addAttribute(attributes, AMQP_REPLY_TO_ATTRIBUTE, properties.getReplyTo());
|
||||
addAttribute(attributes, AMQP_EXPIRATION_ATTRIBUTE, properties.getExpiration());
|
||||
addAttribute(attributes, AMQP_MESSAGE_ID_ATTRIBUTE, properties.getMessageId());
|
||||
addAttribute(attributes, AMQP_TIMESTAMP_ATTRIBUTE, properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
|
||||
addAttribute(attributes, AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getType());
|
||||
addAttribute(attributes, AMQP_USER_ID_ATTRIBUTE, properties.getUserId());
|
||||
addAttribute(attributes, AMQP_CLUSTER_ID_ATTRIBUTE, properties.getClusterId());
|
||||
addAttribute(attributes, AMQP_ROUTING_KEY_ATTRIBUTE, envelope.getRoutingKey());
|
||||
addAttribute(attributes, AMQP_EXCHANGE_ATTRIBUTE, envelope.getExchange());
|
||||
Map<String, Object> headers = properties.getHeaders();
|
||||
|
@ -255,7 +255,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
if (OutputHeaderFormat.ATTRIBUTES.getValue().equals(headersFormat)) {
|
||||
headers.forEach((key, value) -> addAttribute(attributes, String.format("%s.%s", headerAttributePrefix, key), value));
|
||||
} else {
|
||||
addAttribute(attributes, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
|
||||
addAttribute(attributes, AMQP_HEADERS_ATTRIBUTE,
|
||||
buildHeaders(properties.getHeaders(), headersFormat, removeCurlyBraces,
|
||||
valueSeparatorForHeaders));
|
||||
}
|
||||
|
@ -281,7 +281,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
return null;
|
||||
}
|
||||
String headerString = null;
|
||||
if ( OutputHeaderFormat.COMMA_SEPARATED_STRING.getValue().equals(headerFormat)) {
|
||||
if (OutputHeaderFormat.COMMA_SEPARATED_STRING.getValue().equals(headerFormat)) {
|
||||
headerString = convertMapToString(headers, valueSeparatorForHeaders);
|
||||
|
||||
if (!removeCurlyBraces) {
|
||||
|
|
|
@ -255,19 +255,19 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
|
|||
private BasicProperties extractAmqpPropertiesFromFlowFile(final FlowFile flowFile, final InputHeaderSource selectedHeaderSource, final Character separator, final Pattern pattern) {
|
||||
final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
|
||||
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, builder::contentEncoding);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> builder.deliveryMode(Integer.parseInt(mode)));
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri -> builder.priority(Integer.parseInt(pri)));
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new Date(Long.parseLong(ts))));
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, builder::type);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId);
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId);
|
||||
readAmqpAttribute(flowFile, AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
|
||||
readAmqpAttribute(flowFile, AMQP_CONTENT_ENCODING_ATTRIBUTE, builder::contentEncoding);
|
||||
readAmqpAttribute(flowFile, AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> builder.deliveryMode(Integer.parseInt(mode)));
|
||||
readAmqpAttribute(flowFile, AMQP_PRIORITY_ATTRIBUTE, pri -> builder.priority(Integer.parseInt(pri)));
|
||||
readAmqpAttribute(flowFile, AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
|
||||
readAmqpAttribute(flowFile, AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
|
||||
readAmqpAttribute(flowFile, AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
|
||||
readAmqpAttribute(flowFile, AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId);
|
||||
readAmqpAttribute(flowFile, AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new Date(Long.parseLong(ts))));
|
||||
readAmqpAttribute(flowFile, AMQP_TYPE_ATTRIBUTE, builder::type);
|
||||
readAmqpAttribute(flowFile, AMQP_USER_ID_ATTRIBUTE, builder::userId);
|
||||
readAmqpAttribute(flowFile, AMQP_APPID_ATTRIBUTE, builder::appId);
|
||||
readAmqpAttribute(flowFile, AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId);
|
||||
|
||||
Map<String, Object> headers = prepareAMQPHeaders(flowFile, selectedHeaderSource, separator, pattern);
|
||||
builder.headers(headers);
|
||||
|
@ -286,7 +286,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
|
|||
if (InputHeaderSource.FLOWFILE_ATTRIBUTES.equals(selectedHeaderSource)) {
|
||||
headers.putAll(getMatchedAttributes(flowFile.getAttributes(), pattern));
|
||||
} else if (InputHeaderSource.AMQP_HEADERS_ATTRIBUTE.equals(selectedHeaderSource)) {
|
||||
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
|
||||
readAmqpAttribute(flowFile, AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
|
|
@ -24,8 +24,6 @@ import org.apache.nifi.avro.AvroTypeUtil;
|
|||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -33,7 +31,6 @@ import java.util.OptionalInt;
|
|||
|
||||
public class SchemaUtils {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class);
|
||||
private static final ObjectMapper OBJECT_MAPPER = setObjectMapper();
|
||||
|
||||
private SchemaUtils() {
|
||||
|
|
|
@ -25,8 +25,6 @@ import org.apache.nifi.serialization.record.RecordField;
|
|||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
|
@ -37,8 +35,6 @@ import java.util.stream.Collectors;
|
|||
|
||||
public class RecordSchemaProvider {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RecordSchemaProvider.class);
|
||||
|
||||
private final LoadingCache<Class, RecordSchema> schemaCache = Caffeine.newBuilder()
|
||||
.maximumSize(100)
|
||||
.build(this::generateRecordSchema);
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
|
|||
import java.lang.reflect.Field;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.nifi.jasn1.JASN1Utils.getSeqOfElementType;
|
||||
import static org.apache.nifi.jasn1.JASN1Utils.invokeGetter;
|
||||
import static org.apache.nifi.jasn1.JASN1Utils.toGetterMethod;
|
||||
|
||||
|
@ -59,7 +58,7 @@ public class BerArrayConverter implements JASN1TypeAndValueConverter {
|
|||
// Use the generic type of seqOf field to determine the getter method name.
|
||||
final Field seqOfField = value.getClass().getDeclaredField("seqOf");
|
||||
|
||||
final Class seqOf = getSeqOfElementType(seqOfField);
|
||||
final Class seqOf = JASN1Utils.getSeqOfElementType(seqOfField);
|
||||
final String getterMethod = toGetterMethod(seqOf.getSimpleName());
|
||||
|
||||
final DataType elementType = ((ArrayDataType) dataType).getElementType();
|
||||
|
|
|
@ -34,7 +34,7 @@ public enum AwsSignerType implements DescribedValue {
|
|||
private static final Map<String, AwsSignerType> LOOKUP_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (AwsSignerType signerType : AwsSignerType.values()) {
|
||||
for (AwsSignerType signerType : values()) {
|
||||
LOOKUP_MAP.put(signerType.getValue(), signerType);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,7 +158,7 @@ public class PutKinesisStream extends AbstractAwsSyncProcessor<KinesisClient, Ki
|
|||
session.exportTo(flowFile, baos);
|
||||
final PutRecordsRequestEntry.Builder recordBuilder = PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(baos.toByteArray()));
|
||||
|
||||
final String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY)
|
||||
final String partitionKey = context.getProperty(KINESIS_PARTITION_KEY)
|
||||
.evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
recordBuilder.partitionKey(StringUtils.isBlank(partitionKey) ? Integer.toString(randomPartitionKeyGenerator.nextInt()) : partitionKey);
|
||||
|
|
|
@ -29,7 +29,7 @@ public enum TextractType {
|
|||
DOCUMENT_TEXT_DETECTION("Document Text Detection"),
|
||||
EXPENSE_ANALYSIS("Expense Analysis");
|
||||
|
||||
public static final Set<String> TEXTRACT_TYPES = Arrays.stream(TextractType.values()).map(TextractType::getType)
|
||||
public static final Set<String> TEXTRACT_TYPES = Arrays.stream(values()).map(TextractType::getType)
|
||||
.collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
|
||||
|
||||
public final String type;
|
||||
|
|
|
@ -1066,7 +1066,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
} else {
|
||||
first = false;
|
||||
}
|
||||
buf.append(java.lang.String.format("%d/%s", tag.getPartNumber(), tag.getETag()));
|
||||
buf.append(String.format("%d/%s", tag.getPartNumber(), tag.getETag()));
|
||||
}
|
||||
}
|
||||
buf.append(SEPARATOR)
|
||||
|
|
|
@ -70,7 +70,7 @@ public class ServerSideCEncryptionStrategy implements S3EncryptionStrategy {
|
|||
byte[] keyMaterial;
|
||||
|
||||
try {
|
||||
if (!org.apache.commons.codec.binary.Base64.isBase64(keyValue)) {
|
||||
if (!Base64.isBase64(keyValue)) {
|
||||
throw new Exception();
|
||||
}
|
||||
keyMaterial = Base64.decodeBase64(keyValue);
|
||||
|
|
|
@ -65,9 +65,8 @@ public final class AzureEventHubUtils {
|
|||
.description("To support namespaces not in the default windows.net domain.")
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.allowableValues(AzureEventHubUtils.AZURE_ENDPOINT, AzureEventHubUtils.AZURE_CHINA_ENDPOINT,
|
||||
AzureEventHubUtils.AZURE_GERMANY_ENDPOINT, AzureEventHubUtils.AZURE_US_GOV_ENDPOINT)
|
||||
.defaultValue(AzureEventHubUtils.AZURE_ENDPOINT)
|
||||
.allowableValues(AZURE_ENDPOINT, AZURE_CHINA_ENDPOINT, AZURE_GERMANY_ENDPOINT, AZURE_US_GOV_ENDPOINT)
|
||||
.defaultValue(AZURE_ENDPOINT)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
|
|
|
@ -342,7 +342,7 @@ public class CopyAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
|
|||
private static String generateSas(final BlobContainerClient sourceContainerClient) {
|
||||
final BlobContainerSasPermission permissions = new BlobContainerSasPermission().setCreatePermission(true).setWritePermission(true).setAddPermission(true).setReadPermission(true);
|
||||
final OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
|
||||
final OffsetDateTime expiryTime = now.plusHours(CopyAzureBlobStorage_v12.GENERATE_SAS_EXPIRY_HOURS);
|
||||
final OffsetDateTime expiryTime = now.plusHours(GENERATE_SAS_EXPIRY_HOURS);
|
||||
final BlobServiceSasSignatureValues signatureValues = new BlobServiceSasSignatureValues(expiryTime, permissions);
|
||||
return sourceContainerClient.generateSas(signatureValues);
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ public enum KustoIngestionResult {
|
|||
}
|
||||
|
||||
public static KustoIngestionResult fromString(String status) {
|
||||
for (KustoIngestionResult result : KustoIngestionResult.values()) {
|
||||
for (KustoIngestionResult result : values()) {
|
||||
if (result.status.equalsIgnoreCase(status)) {
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -237,7 +237,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
|||
} catch (UploadErrorException e) {
|
||||
handleUploadError(conflictResolution, uploadPath, e);
|
||||
} catch (UploadSessionFinishErrorException e) {
|
||||
handleUploadError(conflictResolution, uploadPath, e);
|
||||
handleUploadSessionError(conflictResolution, uploadPath, e);
|
||||
} catch (RateLimitException e) {
|
||||
context.yield();
|
||||
throw new ProcessException("Dropbox API rate limit exceeded while uploading file", e);
|
||||
|
@ -268,7 +268,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
|
|||
}
|
||||
}
|
||||
|
||||
private void handleUploadError(final ConflictResolutionStrategy conflictResolution, final String uploadPath, final UploadSessionFinishErrorException e) {
|
||||
private void handleUploadSessionError(final ConflictResolutionStrategy conflictResolution, final String uploadPath, final UploadSessionFinishErrorException e) {
|
||||
if (e.errorValue.isPath() && e.errorValue.getPathValue().isConflict()) {
|
||||
handleConflict(conflictResolution, uploadPath, e);
|
||||
} else {
|
||||
|
|
|
@ -196,8 +196,6 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
|||
|
||||
private volatile ProcessSession processSession;
|
||||
|
||||
private volatile boolean shouldSetDeleteFlag;
|
||||
|
||||
protected volatile Optional<OAuth2AccessTokenProvider> oauth2AccessTokenProviderOptional;
|
||||
protected volatile AccessToken oauth2AccessDetails;
|
||||
|
||||
|
@ -313,7 +311,6 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
|||
this.processSession = processSession;
|
||||
this.messageReceiver = this.buildMessageReceiver(context);
|
||||
|
||||
this.shouldSetDeleteFlag = context.getProperty(SHOULD_DELETE_MESSAGES).asBoolean();
|
||||
int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||
|
||||
this.messageReceiver.setMaxFetchSize(fetchSize);
|
||||
|
|
|
@ -49,12 +49,10 @@ public class ChunkHeader extends Block {
|
|||
private final Map<Integer, NameStringNode> nameStrings;
|
||||
private final Map<Integer, TemplateNode> templateNodes;
|
||||
private final int chunkNumber;
|
||||
private final ComponentLog log;
|
||||
private UnsignedLong recordNumber;
|
||||
|
||||
public ChunkHeader(BinaryReader binaryReader, ComponentLog log, long headerOffset, int chunkNumber) throws IOException {
|
||||
super(binaryReader, headerOffset);
|
||||
this.log = log;
|
||||
this.chunkNumber = chunkNumber;
|
||||
CRC32 crc32 = new CRC32();
|
||||
crc32.update(binaryReader.peekBytes(120));
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.nifi.processors.evtx.parser;
|
|||
* Chunks are independent within the file so we should be able to safely continue processing the remaining chunks.
|
||||
*/
|
||||
public class MalformedChunkException extends Exception {
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final long offset;
|
||||
private final int chunkNum;
|
||||
private byte[] badChunk;
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.List;
|
|||
*/
|
||||
public class ConditionalSubstitutionNode extends BxmlNodeWithToken {
|
||||
private final int index;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int type;
|
||||
|
||||
public ConditionalSubstitutionNode(BinaryReader binaryReader, ChunkHeader chunkHeader, BxmlNode parent) throws IOException {
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.List;
|
|||
*/
|
||||
public class NormalSubstitutionNode extends BxmlNodeWithToken {
|
||||
private final int index;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int type;
|
||||
|
||||
public NormalSubstitutionNode(BinaryReader binaryReader, ChunkHeader chunkHeader, BxmlNode parent) throws IOException {
|
||||
|
|
|
@ -29,7 +29,9 @@ import java.io.IOException;
|
|||
* Open tag in the template xml
|
||||
*/
|
||||
public class OpenStartElementNode extends BxmlNodeWithToken {
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int unknown;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final UnsignedInteger size;
|
||||
private final int stringOffset;
|
||||
private final String tagName;
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
|||
|
||||
public class ProcessingInstructionDataNode extends BxmlNodeWithToken {
|
||||
private final int stringLength;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int tagLength;
|
||||
private final String data;
|
||||
|
||||
|
|
|
@ -30,7 +30,9 @@ import java.util.List;
|
|||
* Node denoting the beginning of a stream (generally present before the TemplateInstanceNode)
|
||||
*/
|
||||
public class StreamStartNode extends BxmlNodeWithToken {
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int unknown;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int unknown2;
|
||||
|
||||
public StreamStartNode(BinaryReader binaryReader, ChunkHeader chunkHeader, BxmlNode parent) throws IOException {
|
||||
|
|
|
@ -31,10 +31,11 @@ import java.util.List;
|
|||
* Instance of a Template
|
||||
*/
|
||||
public class TemplateInstanceNode extends BxmlNodeWithToken {
|
||||
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int unknown;
|
||||
private final UnsignedInteger templateId;
|
||||
private final int templateOffset;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final boolean isResident;
|
||||
private final TemplateNode templateNode;
|
||||
private final int templateLength;
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.List;
|
|||
* Parent class for variant nodes (they all have no children)
|
||||
*/
|
||||
public abstract class VariantTypeNode extends BxmlNode {
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final int length;
|
||||
|
||||
public VariantTypeNode(BinaryReader binaryReader, ChunkHeader chunkHeader, BxmlNode parent, int length) throws IOException {
|
||||
|
|
|
@ -28,7 +28,7 @@ public enum ConflictResolutionStrategy implements DescribedValue {
|
|||
private static final Map<String, ConflictResolutionStrategy> ENUM_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (ConflictResolutionStrategy strategy : ConflictResolutionStrategy.values()) {
|
||||
for (ConflictResolutionStrategy strategy : values()) {
|
||||
ENUM_MAP.put(strategy.getValue(), strategy);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ import java.util.Map;
|
|||
/**
|
||||
* EventFactory to create StandardEvent instances.
|
||||
*/
|
||||
public class StandardEventFactory implements EventFactory<StandardEvent> {
|
||||
@SuppressWarnings("PMD.UseDiamondOperator")
|
||||
public class StandardEventFactory<T extends Event<?>> implements EventFactory<StandardEvent> {
|
||||
|
||||
@Override
|
||||
public StandardEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
|
||||
|
|
|
@ -156,10 +156,8 @@ public class SecurityUtil {
|
|||
if (ugi == null) {
|
||||
try {
|
||||
result = action.run();
|
||||
} catch (IOException ioe) {
|
||||
throw ioe;
|
||||
} catch (RuntimeException re) {
|
||||
throw re;
|
||||
} catch (IOException | RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
|
|||
return null;
|
||||
}
|
||||
try (final GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(v))) {
|
||||
return objectMapper.readValue(in, new TypeReference<Map<String, ListedEntity>>() { });
|
||||
return objectMapper.readValue(in, new TypeReference<>() { });
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -174,8 +174,8 @@ public class ListedEntityTracker<T extends ListableEntity> {
|
|||
}
|
||||
|
||||
static void validateProperties(ValidationContext context, Collection<ValidationResult> results, Scope scope) {
|
||||
validateRequiredProperty(context, results, ListedEntityTracker.TRACKING_STATE_CACHE);
|
||||
validateRequiredProperty(context, results, ListedEntityTracker.TRACKING_TIME_WINDOW);
|
||||
validateRequiredProperty(context, results, TRACKING_STATE_CACHE);
|
||||
validateRequiredProperty(context, results, TRACKING_TIME_WINDOW);
|
||||
|
||||
if (Scope.LOCAL.equals(scope)
|
||||
&& StringUtils.isEmpty(context.getProperty(NODE_IDENTIFIER).evaluateAttributeExpressions().getValue())) {
|
||||
|
@ -238,7 +238,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
|
|||
mapCacheClient = context.getProperty(TRACKING_STATE_CACHE).asControllerService(DistributedMapCacheClient.class);
|
||||
this.scope = scope;
|
||||
if (Scope.LOCAL.equals(scope)) {
|
||||
nodeId = context.getProperty(ListedEntityTracker.NODE_IDENTIFIER).evaluateAttributeExpressions().getValue();
|
||||
nodeId = context.getProperty(NODE_IDENTIFIER).evaluateAttributeExpressions().getValue();
|
||||
} else {
|
||||
nodeId = null;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Blob;
|
||||
import java.sql.Time;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
@ -505,7 +506,7 @@ public class AvroTypeUtil {
|
|||
for (final Field field : avroSchema.getFields()) {
|
||||
final String fieldName = field.name();
|
||||
final Schema fieldSchema = field.schema();
|
||||
final DataType dataType = AvroTypeUtil.determineDataType(fieldSchema, knownRecords);
|
||||
final DataType dataType = determineDataType(fieldSchema, knownRecords);
|
||||
final boolean nullable = isNullable(fieldSchema);
|
||||
addFieldToList(recordFields, field, fieldName, fieldSchema, dataType, nullable);
|
||||
}
|
||||
|
@ -694,7 +695,7 @@ public class AvroTypeUtil {
|
|||
}
|
||||
|
||||
private static Long getLongFromTimestamp(final Object rawValue, final Schema fieldSchema, final String fieldName) {
|
||||
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
|
||||
final String format = determineDataType(fieldSchema).getFormat();
|
||||
final FieldConverter<Object, Timestamp> converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(Timestamp.class);
|
||||
final Timestamp timestamp = converter.convertField(rawValue, Optional.ofNullable(format), fieldName);
|
||||
return timestamp.getTime();
|
||||
|
@ -715,12 +716,12 @@ public class AvroTypeUtil {
|
|||
|
||||
|
||||
if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
|
||||
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
|
||||
final String format = determineDataType(fieldSchema).getFormat();
|
||||
final FieldConverter<Object, LocalDate> fieldConverter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(LocalDate.class);
|
||||
final LocalDate localDate = fieldConverter.convertField(rawValue, Optional.ofNullable(format), fieldName);
|
||||
return (int) localDate.toEpochDay();
|
||||
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
|
||||
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
|
||||
final String format = determineDataType(fieldSchema).getFormat();
|
||||
return getLogicalTimeMillis(rawValue, format, fieldName);
|
||||
}
|
||||
|
||||
|
@ -794,7 +795,7 @@ public class AvroTypeUtil {
|
|||
|
||||
return new GenericData.Fixed(fieldSchema, rawBytes);
|
||||
} else {
|
||||
return AvroTypeUtil.convertByteArray((Object[]) rawValue);
|
||||
return convertByteArray((Object[]) rawValue);
|
||||
}
|
||||
}
|
||||
try {
|
||||
|
@ -997,7 +998,7 @@ public class AvroTypeUtil {
|
|||
}
|
||||
|
||||
foundNonNull = true;
|
||||
final DataType desiredDataType = AvroTypeUtil.determineDataType(subSchema);
|
||||
final DataType desiredDataType = determineDataType(subSchema);
|
||||
try {
|
||||
final Object convertedValue = conversion.apply(subSchema);
|
||||
|
||||
|
@ -1078,7 +1079,7 @@ public class AvroTypeUtil {
|
|||
return java.sql.Date.valueOf(LocalDate.ofEpochDay((int) value));
|
||||
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalName)) {
|
||||
// time-millis logical name means that the value is number of milliseconds since midnight.
|
||||
return new java.sql.Time((int) value);
|
||||
return new Time((int) value);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -1091,11 +1092,11 @@ public class AvroTypeUtil {
|
|||
|
||||
final String logicalName = logicalType.getName();
|
||||
if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) {
|
||||
return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
|
||||
return new Time(TimeUnit.MICROSECONDS.toMillis((long) value));
|
||||
} else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
|
||||
return new java.sql.Timestamp((long) value);
|
||||
return new Timestamp((long) value);
|
||||
} else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) {
|
||||
return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
|
||||
return new Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1115,7 +1116,7 @@ public class AvroTypeUtil {
|
|||
final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name());
|
||||
values.put(field.name(), fieldValue);
|
||||
}
|
||||
final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema, false);
|
||||
final RecordSchema childSchema = createSchema(recordSchema, false);
|
||||
return new MapRecord(childSchema, values);
|
||||
case BYTES:
|
||||
final ByteBuffer bb = (ByteBuffer) value;
|
||||
|
@ -1123,7 +1124,7 @@ public class AvroTypeUtil {
|
|||
if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
|
||||
return new Conversions.DecimalConversion().fromBytes(bb, avroSchema, logicalType);
|
||||
}
|
||||
return AvroTypeUtil.convertByteArray(bb.array());
|
||||
return convertByteArray(bb.array());
|
||||
case FIXED:
|
||||
final GenericFixed fixed = (GenericFixed) value;
|
||||
final LogicalType fixedLogicalType = avroSchema.getLogicalType();
|
||||
|
@ -1131,7 +1132,7 @@ public class AvroTypeUtil {
|
|||
final ByteBuffer fixedByteBuffer = ByteBuffer.wrap(fixed.bytes());
|
||||
return new Conversions.DecimalConversion().fromBytes(fixedByteBuffer, avroSchema, fixedLogicalType);
|
||||
}
|
||||
return AvroTypeUtil.convertByteArray(fixed.bytes());
|
||||
return convertByteArray(fixed.bytes());
|
||||
case ENUM:
|
||||
return value.toString();
|
||||
case NULL:
|
||||
|
|
|
@ -154,7 +154,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
|
|||
public abstract HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path)
|
||||
throws IOException;
|
||||
|
||||
|
||||
@SuppressWarnings("PMD.IdenticalCatchBranches")
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
// do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
|
||||
|
|
|
@ -55,7 +55,7 @@ public class MockSchemaRegistry extends AbstractControllerService implements Sch
|
|||
RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
|
||||
final Optional<String> schemaName = schemaIdentifier.getName();
|
||||
if (schemaName.isEmpty()) {
|
||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
|
||||
throw new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
|
||||
}
|
||||
|
||||
final String schemaBranch = schemaIdentifier.getBranch().orElse(null);
|
||||
|
@ -66,12 +66,12 @@ public class MockSchemaRegistry extends AbstractControllerService implements Sch
|
|||
private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
|
||||
final OptionalLong schemaId = schemaIdentifier.getIdentifier();
|
||||
if (schemaId.isEmpty()) {
|
||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
|
||||
throw new SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
|
||||
}
|
||||
|
||||
final OptionalInt version = schemaIdentifier.getVersion();
|
||||
if (version.isEmpty()) {
|
||||
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present");
|
||||
throw new SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present");
|
||||
}
|
||||
|
||||
final Tuple<Long, Integer> tuple = new Tuple<>(schemaId.getAsLong(), version.getAsInt());
|
||||
|
|
|
@ -286,7 +286,7 @@ public class CSVUtils {
|
|||
final Character quoteChar = getCharUnescaped(context, QUOTE_CHAR, variables);
|
||||
builder = builder.setQuote(quoteChar);
|
||||
|
||||
final Character escapeChar = context.getProperty(CSVUtils.ESCAPE_CHAR).evaluateAttributeExpressions(variables).getValue().isEmpty() ? null : getCharUnescaped(context, ESCAPE_CHAR, variables);
|
||||
final Character escapeChar = context.getProperty(ESCAPE_CHAR).evaluateAttributeExpressions(variables).getValue().isEmpty() ? null : getCharUnescaped(context, ESCAPE_CHAR, variables);
|
||||
builder = builder.setEscape(escapeChar);
|
||||
|
||||
builder = builder.setTrim(context.getProperty(TRIM_FIELDS).asBoolean());
|
||||
|
|
|
@ -87,7 +87,7 @@ abstract public class AbstractGetGcpVisionAnnotateOperationStatus extends Abstra
|
|||
return;
|
||||
}
|
||||
try {
|
||||
String operationKey = context.getProperty(OPERATION_KEY).evaluateAttributeExpressions(flowFile).getValue();;
|
||||
String operationKey = context.getProperty(OPERATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
|
||||
Operation operation = getVisionClient().getOperationsClient().getOperation(operationKey);
|
||||
getLogger().info("{}", operation);
|
||||
if (operation.getDone() && !operation.hasError()) {
|
||||
|
|
|
@ -488,7 +488,7 @@ public class TinkerpopClientService extends AbstractControllerService implements
|
|||
Map.Entry<String, Object> tempResult = (Map.Entry<String, Object>) resultSet.next();
|
||||
Map<String, Object> tempRetObject = new HashMap<>();
|
||||
tempRetObject.put(tempResult.getKey(), tempResult.getValue());
|
||||
SimpleEntry returnObject = new SimpleEntry<String, Object>(tempResult.getKey(), tempRetObject);
|
||||
SimpleEntry<String, Object> returnObject = new SimpleEntry<>(tempResult.getKey(), tempRetObject);
|
||||
Map<String, Object> resultReturnMap = new HashMap<>();
|
||||
resultReturnMap.put(innerResultSet.getKey(), returnObject);
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
|
|
|
@ -20,16 +20,9 @@ import java.lang.reflect.Method;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public final class Utils {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(Utils.class);
|
||||
|
||||
/**
|
||||
* Creates new instance of the class specified by 'className' by first
|
||||
* loading it using thread context class loader and then executing default
|
||||
|
|
|
@ -89,7 +89,7 @@ public class JoltTransformJSON extends AbstractJoltTransform {
|
|||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES;
|
||||
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);;
|
||||
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
|
||||
private volatile ClassLoader customClassLoader;
|
||||
private volatile JsonUtil jsonUtil;
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ language governing permissions and limitations under the License. -->
|
|||
<executions>
|
||||
<execution>
|
||||
<id>unpack-nifi-jolt-transform-json-ui</id>
|
||||
<phase>generate-resources</phase>
|
||||
<phase>prepare-package</phase>
|
||||
<goals>
|
||||
<goal>unpack-dependencies</goal>
|
||||
</goals>
|
||||
|
|
|
@ -278,8 +278,8 @@ public class PutMongoRecord extends AbstractMongoProcessor {
|
|||
return retVal;
|
||||
}
|
||||
|
||||
private List convertArrays(Object[] input) {
|
||||
List retVal = new ArrayList();
|
||||
private List<Object> convertArrays(Object[] input) {
|
||||
List<Object> retVal = new ArrayList<>();
|
||||
for (Object o : input) {
|
||||
if (o != null && o.getClass().isArray()) {
|
||||
retVal.add(convertArrays((Object[]) o));
|
||||
|
|
|
@ -174,7 +174,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
|
|||
try {
|
||||
MongoCollection<Document> collection = getCollection(context, flowFile);
|
||||
List<Bson> aggQuery = buildAggregationQuery(query);
|
||||
AggregateIterable<Document> it = collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);;
|
||||
AggregateIterable<Document> it = collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);
|
||||
it.batchSize(batchSize != null ? batchSize : 1);
|
||||
|
||||
iter = it.iterator();
|
||||
|
|
|
@ -122,19 +122,17 @@ public class PutGridFS extends AbstractGridFSProcessor {
|
|||
static final Set<Relationship> RELATIONSHIP_SET;
|
||||
|
||||
static {
|
||||
List _temp = new ArrayList<>();
|
||||
_temp.addAll(PARENT_PROPERTIES);
|
||||
_temp.add(FILE_NAME);
|
||||
_temp.add(PROPERTIES_PREFIX);
|
||||
_temp.add(ENFORCE_UNIQUENESS);
|
||||
_temp.add(HASH_ATTRIBUTE);
|
||||
_temp.add(CHUNK_SIZE);
|
||||
DESCRIPTORS = Collections.unmodifiableList(_temp);
|
||||
List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(PARENT_PROPERTIES);
|
||||
propertyDescriptors.add(FILE_NAME);
|
||||
propertyDescriptors.add(PROPERTIES_PREFIX);
|
||||
propertyDescriptors.add(ENFORCE_UNIQUENESS);
|
||||
propertyDescriptors.add(HASH_ATTRIBUTE);
|
||||
propertyDescriptors.add(CHUNK_SIZE);
|
||||
DESCRIPTORS = Collections.unmodifiableList(propertyDescriptors);
|
||||
|
||||
Set _rels = new HashSet();
|
||||
_rels.addAll(PARENT_RELATIONSHIPS);
|
||||
_rels.add(REL_DUPLICATE);
|
||||
RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
|
||||
Set<Relationship> relationships = new HashSet<>(PARENT_RELATIONSHIPS);
|
||||
relationships.add(REL_DUPLICATE);
|
||||
RELATIONSHIP_SET = Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
private String uniqueness;
|
||||
|
|
|
@ -42,7 +42,6 @@ public class WriteParquetResult extends AbstractRecordSetWriter {
|
|||
|
||||
private final Schema schema;
|
||||
private final ParquetWriter<GenericRecord> parquetWriter;
|
||||
private final ComponentLog componentLogger;
|
||||
private SchemaAccessWriter accessWriter;
|
||||
private RecordSchema recordSchema;
|
||||
|
||||
|
@ -50,7 +49,6 @@ public class WriteParquetResult extends AbstractRecordSetWriter {
|
|||
final ParquetConfig parquetConfig, final ComponentLog componentLogger) throws IOException {
|
||||
super(out);
|
||||
this.schema = avroSchema;
|
||||
this.componentLogger = componentLogger;
|
||||
this.accessWriter = accessWriter;
|
||||
this.recordSchema = recordSchema;
|
||||
|
||||
|
|
|
@ -104,7 +104,7 @@
|
|||
<goals>
|
||||
<goal>unpack-dependencies</goal>
|
||||
</goals>
|
||||
<phase>generate-test-resources</phase>
|
||||
<phase>prepare-package</phase>
|
||||
<configuration>
|
||||
<includeArtifactIds>nifi-python-framework</includeArtifactIds>
|
||||
<excludeTransitive>true</excludeTransitive>
|
||||
|
@ -119,7 +119,7 @@
|
|||
<goals>
|
||||
<goal>unpack-dependencies</goal>
|
||||
</goals>
|
||||
<phase>generate-test-resources</phase>
|
||||
<phase>prepare-package</phase>
|
||||
<configuration>
|
||||
<includeArtifactIds>nifi-python-extension-api</includeArtifactIds>
|
||||
<excludeTransitive>true</excludeTransitive>
|
||||
|
@ -134,7 +134,7 @@
|
|||
<goals>
|
||||
<goal>unpack-dependencies</goal>
|
||||
</goals>
|
||||
<phase>generate-test-resources</phase>
|
||||
<phase>prepare-package</phase>
|
||||
<configuration>
|
||||
<includeArtifactIds>nifi-python-test-extensions</includeArtifactIds>
|
||||
<excludeTransitive>true</excludeTransitive>
|
||||
|
|
|
@ -264,10 +264,8 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
|
|||
|
||||
try {
|
||||
return convertJsonNodeToRecord(nextNode, getSchema(), null, coerceTypes, dropUnknownFields);
|
||||
} catch (final MalformedRecordException mre) {
|
||||
throw mre;
|
||||
} catch (final IOException ioe) {
|
||||
throw ioe;
|
||||
} catch (final MalformedRecordException | IOException e) {
|
||||
throw e;
|
||||
} catch (final Exception e) {
|
||||
throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e);
|
||||
}
|
||||
|
|
|
@ -26,8 +26,8 @@ import java.util.Optional;
|
|||
public class V1V2cSNMPFactory extends SNMPManagerFactory implements SNMPContext {
|
||||
|
||||
@Override
|
||||
public Target createTargetInstance(final SNMPConfiguration configuration) {
|
||||
final Target communityTarget = new CommunityTarget();
|
||||
public Target<?> createTargetInstance(final SNMPConfiguration configuration) {
|
||||
final Target<?> communityTarget = new CommunityTarget<>();
|
||||
setupTargetBasicProperties(communityTarget, configuration);
|
||||
final String community = configuration.getCommunityString();
|
||||
|
||||
|
|
|
@ -62,8 +62,8 @@ public class V3SNMPFactory extends SNMPManagerFactory implements SNMPContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Target createTargetInstance(final SNMPConfiguration configuration) {
|
||||
final UserTarget userTarget = new UserTarget();
|
||||
public Target<?> createTargetInstance(final SNMPConfiguration configuration) {
|
||||
final UserTarget<?> userTarget = new UserTarget<>();
|
||||
setupTargetBasicProperties(userTarget, configuration);
|
||||
|
||||
final int securityLevel = SecurityLevel.valueOf(configuration.getSecurityLevel()).getSnmpValue();
|
||||
|
|
|
@ -95,7 +95,7 @@
|
|||
<executions>
|
||||
<execution>
|
||||
<id>unpack-standard-content-viewer-ui</id>
|
||||
<phase>generate-resources</phase>
|
||||
<phase>prepare-package</phase>
|
||||
<goals>
|
||||
<goal>unpack-dependencies</goal>
|
||||
</goals>
|
||||
|
|
|
@ -59,7 +59,6 @@ import org.apache.nifi.serialization.record.type.MapDataType;
|
|||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.time.LocalDate;
|
||||
import java.time.ZoneId;
|
||||
|
@ -315,8 +314,7 @@ public class GenerateRecord extends AbstractProcessor {
|
|||
return faker.number().randomDouble(6, Long.MIN_VALUE, Long.MAX_VALUE);
|
||||
case FLOAT:
|
||||
final double randomDouble = faker.number().randomDouble(6, Long.MIN_VALUE, Long.MAX_VALUE);
|
||||
final BigDecimal asBigDecimal = new BigDecimal(randomDouble);
|
||||
return asBigDecimal.floatValue();
|
||||
return (float) randomDouble;
|
||||
case DECIMAL:
|
||||
return faker.number().randomDouble(((DecimalDataType) recordField.getDataType()).getScale(), Long.MIN_VALUE, Long.MAX_VALUE);
|
||||
case INT:
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.nifi.processor.util.listen.event.EventFactory;
|
|||
import org.apache.nifi.processor.util.listen.event.StandardEvent;
|
||||
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -121,13 +120,12 @@ public class ListenUDP extends AbstractListenEventBatchingProcessor<StandardEven
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
|
||||
throws IOException {
|
||||
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events) {
|
||||
final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
|
||||
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
|
||||
final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
|
||||
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
|
||||
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory<>();
|
||||
return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort);
|
||||
}
|
||||
|
||||
|
|
|
@ -219,7 +219,7 @@ public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent>
|
|||
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
|
||||
final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
|
||||
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
|
||||
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory<>();
|
||||
return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort);
|
||||
}
|
||||
|
||||
|
|
|
@ -127,7 +127,7 @@ public class ModifyBytes extends AbstractProcessor {
|
|||
ff = session.write(ff, new StreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in, final OutputStream out) throws IOException {
|
||||
in.skip(startOffset);
|
||||
in.skipNBytes(startOffset);
|
||||
StreamUtils.copy(in, out, newFileSize);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -51,7 +51,7 @@ public class ProxyConfiguration {
|
|||
}
|
||||
|
||||
return new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
|
||||
.fromPropertyDescriptor(PROXY_CONFIGURATION_SERVICE)
|
||||
.description(description.toString())
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -42,9 +42,7 @@ public abstract class AvroRecordReader implements RecordReader {
|
|||
final RecordSchema schema = getSchema();
|
||||
final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema);
|
||||
return new MapRecord(schema, values);
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
} catch (MalformedRecordException e) {
|
||||
} catch (IOException | MalformedRecordException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new MalformedRecordException("Error while getting next record", e);
|
||||
|
|
|
@ -48,11 +48,9 @@ public class WebSocketMessageRouter {
|
|||
public synchronized void deregisterProcessor(final Processor processor) {
|
||||
if (!isProcessorRegistered(processor)) {
|
||||
if (this.processor == null) {
|
||||
logger.info("Deregister processor {}, do nothing because this router doesn't have registered processor",
|
||||
new Object[]{processor});
|
||||
logger.info("Deregister processor {}, do nothing because this router doesn't have registered processor", processor);
|
||||
} else {
|
||||
logger.info("Deregister processor {}, do nothing because this router is assigned to different processor {}",
|
||||
new Object[]{processor, this.processor});
|
||||
logger.info("Deregister processor {}, do nothing because this router is assigned to different processor {}", processor, this.processor);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -62,7 +60,7 @@ public class WebSocketMessageRouter {
|
|||
try {
|
||||
disconnect(sessionId, "Processing has stopped.");
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to disconnect session {} due to {}", sessionId, e, e);
|
||||
logger.warn("Failed to disconnect session {} endpoint [{}] due to {}", sessionId, endpointId, e, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ public enum RequestAction {
|
|||
READ("read"),
|
||||
WRITE("write");
|
||||
|
||||
private String value;
|
||||
private final String value;
|
||||
|
||||
RequestAction(String value) {
|
||||
this.value = value;
|
||||
|
@ -35,12 +35,12 @@ public enum RequestAction {
|
|||
}
|
||||
|
||||
public static RequestAction valueOfValue(final String action) {
|
||||
if (RequestAction.READ.toString().equals(action)) {
|
||||
return RequestAction.READ;
|
||||
} else if (RequestAction.WRITE.toString().equals(action)) {
|
||||
return RequestAction.WRITE;
|
||||
if (READ.toString().equals(action)) {
|
||||
return READ;
|
||||
} else if (WRITE.toString().equals(action)) {
|
||||
return WRITE;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Action must be one of [" + READ.toString() + ", " + WRITE.toString() + "]");
|
||||
throw new IllegalArgumentException("Action must be one of [" + READ + ", " + WRITE + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public interface MetricDescriptor<T> {
|
|||
DURATION,
|
||||
DATA_SIZE,
|
||||
FRACTION
|
||||
};
|
||||
}
|
||||
|
||||
int getMetricIdentifier();
|
||||
|
||||
|
|
|
@ -172,7 +172,7 @@ public class NarManifest {
|
|||
|
||||
final Attributes attributes = manifest.getMainAttributes();
|
||||
|
||||
return NarManifest.builder()
|
||||
return builder()
|
||||
.group(attributes.getValue(NarManifestEntry.NAR_GROUP.getEntryName()))
|
||||
.id(attributes.getValue(NarManifestEntry.NAR_ID.getEntryName()))
|
||||
.version(attributes.getValue(NarManifestEntry.NAR_VERSION.getEntryName()))
|
||||
|
|
|
@ -149,7 +149,7 @@ public class NarProperties {
|
|||
final String installedValue = properties.getProperty(NarProperty.INSTALLED.getKey());
|
||||
final Instant installed = Instant.parse(installedValue);
|
||||
|
||||
return NarProperties.builder()
|
||||
return builder()
|
||||
.sourceType(properties.getProperty(NarProperty.SOURCE_TYPE.getKey()))
|
||||
.sourceId(properties.getProperty(NarProperty.SOURCE_ID.getKey()))
|
||||
.narGroup(properties.getProperty(NarProperty.NAR_GROUP.getKey()))
|
||||
|
|
|
@ -25,8 +25,6 @@ import org.apache.nifi.components.state.StateMap;
|
|||
* Standard implementation of StateMap
|
||||
*/
|
||||
class StandardStateMap implements StateMap {
|
||||
private static final int EMPTY_VERSION = -1;
|
||||
|
||||
private final Map<String, String> data;
|
||||
|
||||
private final Optional<String> version;
|
||||
|
|
|
@ -39,9 +39,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class IndexConfiguration {
|
||||
|
||||
private final RepositoryConfiguration repoConfig;
|
||||
|
|
|
@ -280,7 +280,7 @@ public class LuceneEventIndex implements EventIndex {
|
|||
|
||||
@Override
|
||||
public long getMinimumEventIdToReindex(final String partitionName) {
|
||||
return Math.max(0, getMaxEventId(partitionName) - EventIndexTask.MAX_DOCUMENTS_PER_THREAD * LuceneEventIndex.MAX_INDEX_THREADS);
|
||||
return Math.max(0, getMaxEventId(partitionName) - EventIndexTask.MAX_DOCUMENTS_PER_THREAD * MAX_INDEX_THREADS);
|
||||
}
|
||||
|
||||
protected IndexDirectoryManager getDirectoryManager() {
|
||||
|
|
|
@ -47,9 +47,6 @@ import java.util.Map;
|
|||
|
||||
public class LuceneUtil {
|
||||
|
||||
private static final long[] MIN_LONG_ARRAY = new long[] {Long.MIN_VALUE};
|
||||
private static final long[] MAX_LONG_ARRAY = new long[] {Long.MAX_VALUE};
|
||||
|
||||
public static String substringBefore(final String value, final String searchValue) {
|
||||
final int index = value.indexOf(searchValue);
|
||||
return (index < 0) ? value : value.substring(0, index);
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.nifi.repository.schema.ComplexRecordField;
|
|||
import org.apache.nifi.repository.schema.FieldType;
|
||||
import org.apache.nifi.repository.schema.MapRecordField;
|
||||
import org.apache.nifi.repository.schema.RecordField;
|
||||
import org.apache.nifi.repository.schema.Repetition;
|
||||
import org.apache.nifi.repository.schema.SimpleRecordField;
|
||||
import org.apache.nifi.repository.schema.UnionRecordField;
|
||||
|
||||
|
@ -41,14 +40,14 @@ public class LookupTableEventRecordFields {
|
|||
public static final RecordField EVENT_DETAILS = new SimpleRecordField(EventFieldNames.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE);
|
||||
|
||||
// Make lookup id or a string, depending on whether or not available in header.
|
||||
public static final RecordField NO_VALUE = new SimpleRecordField(EventFieldNames.NO_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE);
|
||||
public static final RecordField EXPLICIT_STRING = new SimpleRecordField(EventFieldNames.EXPLICIT_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE);
|
||||
public static final RecordField LOOKUP_VALUE = new SimpleRecordField(EventFieldNames.LOOKUP_VALUE, FieldType.INT, Repetition.EXACTLY_ONE);
|
||||
public static final RecordField UNCHANGED_VALUE = new SimpleRecordField(EventFieldNames.UNCHANGED_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE);
|
||||
public static final RecordField NO_VALUE = new SimpleRecordField(EventFieldNames.NO_VALUE, FieldType.STRING, EXACTLY_ONE);
|
||||
public static final RecordField EXPLICIT_STRING = new SimpleRecordField(EventFieldNames.EXPLICIT_VALUE, FieldType.STRING, EXACTLY_ONE);
|
||||
public static final RecordField LOOKUP_VALUE = new SimpleRecordField(EventFieldNames.LOOKUP_VALUE, FieldType.INT, EXACTLY_ONE);
|
||||
public static final RecordField UNCHANGED_VALUE = new SimpleRecordField(EventFieldNames.UNCHANGED_VALUE, FieldType.STRING, EXACTLY_ONE);
|
||||
|
||||
public static final RecordField COMPONENT_ID = new UnionRecordField(EventFieldNames.COMPONENT_ID, Repetition.EXACTLY_ONE, NO_VALUE, EXPLICIT_STRING, LOOKUP_VALUE);
|
||||
public static final RecordField SOURCE_QUEUE_ID = new UnionRecordField(EventFieldNames.SOURCE_QUEUE_IDENTIFIER, Repetition.EXACTLY_ONE, NO_VALUE, EXPLICIT_STRING, LOOKUP_VALUE);
|
||||
public static final RecordField COMPONENT_TYPE = new UnionRecordField(EventFieldNames.COMPONENT_TYPE, Repetition.EXACTLY_ONE, EXPLICIT_STRING, LOOKUP_VALUE);
|
||||
public static final RecordField COMPONENT_ID = new UnionRecordField(EventFieldNames.COMPONENT_ID, EXACTLY_ONE, NO_VALUE, EXPLICIT_STRING, LOOKUP_VALUE);
|
||||
public static final RecordField SOURCE_QUEUE_ID = new UnionRecordField(EventFieldNames.SOURCE_QUEUE_IDENTIFIER, EXACTLY_ONE, NO_VALUE, EXPLICIT_STRING, LOOKUP_VALUE);
|
||||
public static final RecordField COMPONENT_TYPE = new UnionRecordField(EventFieldNames.COMPONENT_TYPE, EXACTLY_ONE, EXPLICIT_STRING, LOOKUP_VALUE);
|
||||
|
||||
// Attributes
|
||||
public static final RecordField ATTRIBUTE_NAME = new SimpleRecordField(EventFieldNames.ATTRIBUTE_NAME, FieldType.LONG_STRING, EXACTLY_ONE);
|
||||
|
@ -71,7 +70,7 @@ public class LookupTableEventRecordFields {
|
|||
public static final RecordField CURRENT_CONTENT_CLAIM_EXPLICIT = new ComplexRecordField(EventFieldNames.EXPLICIT_VALUE, EXACTLY_ONE,
|
||||
CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
|
||||
public static final RecordField CURRENT_CONTENT_CLAIM = new UnionRecordField(EventFieldNames.CONTENT_CLAIM,
|
||||
Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT);
|
||||
EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT);
|
||||
|
||||
|
||||
// EventType-Specific fields
|
||||
|
|
|
@ -30,8 +30,8 @@ import java.util.List;
|
|||
|
||||
interface StatusHistoryStorage {
|
||||
|
||||
default void init() { };
|
||||
default void close() { };
|
||||
default void init() { }
|
||||
default void close() { }
|
||||
|
||||
List<StatusSnapshot> getConnectionSnapshots(final String componentId, final Date start, final Date end);
|
||||
List<StatusSnapshot> getProcessGroupSnapshots(final String componentId, final Date start, final Date end);
|
||||
|
|
|
@ -34,7 +34,7 @@ enum StorageStatusType {
|
|||
}
|
||||
|
||||
static StorageStatusType getById(final int id) {
|
||||
final Optional<StorageStatusType> result = Arrays.stream(StorageStatusType.values()).filter(storageStatusType -> storageStatusType.getId() == id).findFirst();
|
||||
final Optional<StorageStatusType> result = Arrays.stream(values()).filter(storageStatusType -> storageStatusType.getId() == id).findFirst();
|
||||
|
||||
if (result.isEmpty()) {
|
||||
throw new IllegalArgumentException("Unknown storage type id " + id);
|
||||
|
|
|
@ -50,7 +50,7 @@ final class EmbeddedDatabaseManager implements DatabaseManager {
|
|||
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
|
||||
|
||||
private final String id = UUID.randomUUID().toString();
|
||||
private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
|
||||
private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new AtomicReference<>(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
|
||||
private final ReadWriteLock databaseStructureLock = new ReentrantReadWriteLock();
|
||||
private final EmbeddedDatabaseManagerContext context;
|
||||
private final AtomicReference<CairoEngine> engine = new AtomicReference<>();
|
||||
|
|
|
@ -35,8 +35,8 @@ public class LineageRequestDTO {
|
|||
public enum LineageRequestType {
|
||||
PARENTS,
|
||||
CHILDREN,
|
||||
FLOWFILE;
|
||||
};
|
||||
FLOWFILE
|
||||
}
|
||||
|
||||
private Long eventId;
|
||||
private LineageRequestType lineageRequestType;
|
||||
|
|
|
@ -31,7 +31,7 @@ public class StatusDescriptorDTO {
|
|||
COUNT,
|
||||
DURATION,
|
||||
DATA_SIZE
|
||||
};
|
||||
}
|
||||
|
||||
private String field;
|
||||
private String label;
|
||||
|
|
|
@ -67,7 +67,7 @@ public class AuthorizationsHolder {
|
|||
* @param policies the JAXB Policies element
|
||||
* @return a set of AccessPolicies corresponding to the provided Resources
|
||||
*/
|
||||
private Set<AccessPolicy> createAccessPolicies(org.apache.nifi.authorization.file.generated.Policies policies) {
|
||||
private Set<AccessPolicy> createAccessPolicies(final Policies policies) {
|
||||
Set<AccessPolicy> allPolicies = new HashSet<>();
|
||||
if (policies == null || policies.getPolicy() == null) {
|
||||
return allPolicies;
|
||||
|
|
|
@ -54,7 +54,7 @@ public final class NiFiUserUtils {
|
|||
|
||||
public static String getNiFiUserIdentity() {
|
||||
// get the nifi user to extract the username
|
||||
NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
NiFiUser user = getNiFiUser();
|
||||
if (user == null) {
|
||||
return "unknown";
|
||||
} else {
|
||||
|
|
|
@ -55,6 +55,6 @@ public class FlowRegistryClientEndpointMerger extends AbstractSingleEntityEndpoi
|
|||
protected void mergeResponses(
|
||||
final FlowRegistryClientEntity clientEntity, final Map<NodeIdentifier, FlowRegistryClientEntity> entityMap,
|
||||
final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses) {
|
||||
flowRegistryClientEntityMerger.merge(clientEntity, entityMap);;
|
||||
flowRegistryClientEntityMerger.merge(clientEntity, entityMap);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,7 @@ public final class StandardConnection implements Connection {
|
|||
private final AtomicInteger labelIndex = new AtomicInteger(1);
|
||||
private final AtomicLong zIndex = new AtomicLong(DEFAULT_Z_INDEX);
|
||||
private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private final ProcessScheduler scheduler;
|
||||
private final int hashCode;
|
||||
|
||||
|
@ -383,6 +384,7 @@ public final class StandardConnection implements Connection {
|
|||
private Connectable destination;
|
||||
private Collection<Relationship> relationships;
|
||||
private FlowFileQueueFactory flowFileQueueFactory;
|
||||
@SuppressWarnings("PMD.UnusedPrivateField")
|
||||
private boolean clustered = false;
|
||||
|
||||
public Builder(final ProcessScheduler scheduler) {
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue