mirror of https://github.com/apache/nifi.git
NIFI-2816 - Clean typos across the code - Part 2. This closes #1085
This commit is contained in:
parent
f157076378
commit
3b408f5601
|
@ -37,7 +37,7 @@ public interface ControllerServiceLookup {
|
|||
boolean isControllerServiceEnabled(String serviceIdentifier);
|
||||
|
||||
/**
|
||||
* @param serviceIdentifier idenfitier of service to check
|
||||
* @param serviceIdentifier identifier of service to check
|
||||
* @return <code>true</code> if the Controller Service with the given
|
||||
* identifier has been enabled but is still in the transitioning state,
|
||||
* otherwise returns <code>false</code>. If the given identifier is not
|
||||
|
|
|
@ -91,7 +91,7 @@ public class ShutdownHook extends Thread {
|
|||
System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
|
||||
}
|
||||
}catch (IOException ex){
|
||||
System.err.println("Failed to retrive status file " + ex);
|
||||
System.err.println("Failed to retrieve status file " + ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ final class ValueLookup implements Map<String, String> {
|
|||
if (value == null) {
|
||||
return false;
|
||||
}
|
||||
//check entrySet then iterate through values (otherwise might find a value that was hidden/overriden
|
||||
//check entrySet then iterate through values (otherwise might find a value that was hidden/overridden
|
||||
final Collection<String> values = values();
|
||||
return values.contains(value.toString());
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ public class TestHL7Query {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAndWithParens() throws HL7Exception, IOException {
|
||||
public void testAndWithParents() throws HL7Exception, IOException {
|
||||
HL7Query hl7Query = HL7Query.compile("DECLARE result AS REQUIRED OBX SELECT MESSAGE WHERE result.7 = 'L' AND result.3.1 = 'GLU'");
|
||||
QueryResult result = hl7Query.evaluate(createMessage(new File("src/test/resources/hypoglycemia")));
|
||||
assertTrue(result.isMatch());
|
||||
|
|
|
@ -73,14 +73,14 @@ public class BufferPool implements Runnable {
|
|||
private synchronized void computeRate() {
|
||||
final Calendar now = Calendar.getInstance();
|
||||
final long measurementDurationMillis = now.getTimeInMillis() - lastRateSampleTime.getTimeInMillis();
|
||||
final double duractionSecs = ((double) measurementDurationMillis) / 1000.0;
|
||||
if (duractionSecs >= 0.75) { //recompute every 3/4 second or when we're too fast
|
||||
final long totalDuractionMillis = now.getTimeInMillis() - startTime.getTimeInMillis();
|
||||
final double totalDurationSecs = ((double) totalDuractionMillis) / 1000.0;
|
||||
final double durationSecs = ((double) measurementDurationMillis) / 1000.0;
|
||||
if (durationSecs >= 0.75) { //recompute every 3/4 second or when we're too fast
|
||||
final long totalDurationMillis = now.getTimeInMillis() - startTime.getTimeInMillis();
|
||||
final double totalDurationSecs = ((double) totalDurationMillis) / 1000.0;
|
||||
final long differenceBytes = totalBytesExtracted - lastTotalBytesExtracted;
|
||||
lastTotalBytesExtracted = totalBytesExtracted;
|
||||
lastRateSampleTime = now;
|
||||
final double bps = ((double) differenceBytes) / duractionSecs;
|
||||
final double bps = ((double) differenceBytes) / durationSecs;
|
||||
final double totalBps = ((double) totalBytesExtracted / totalDurationSecs);
|
||||
lastRateSampleMBps = bps / ONE_MB;
|
||||
overallMBps = totalBps / ONE_MB;
|
||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.nifi.flowfile.attributes;
|
|||
public enum CoreAttributes implements FlowFileAttributeKey {
|
||||
|
||||
/**
|
||||
* The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
|
||||
* The FlowFile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
|
||||
*/
|
||||
PATH("path"),
|
||||
/**
|
||||
* The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
|
||||
* The FlowFile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
|
||||
*/
|
||||
ABSOLUTE_PATH("absolute.path"),
|
||||
/**
|
||||
|
|
|
@ -95,7 +95,7 @@ public class ClientUtils {
|
|||
*
|
||||
* @param uri the uri to post to
|
||||
* @param formData the data to post
|
||||
* @return the client reponse of the post
|
||||
* @return the client response of the post
|
||||
*/
|
||||
public ClientResponse post(URI uri, Map<String, String> formData) throws ClientHandlerException, UniformInterfaceException {
|
||||
// convert the form data
|
||||
|
|
|
@ -40,7 +40,7 @@ public interface SwapSummary {
|
|||
Long getMaxFlowFileId();
|
||||
|
||||
/**
|
||||
* Returns a List of all ResoruceClaims that are referenced by the FlowFiles in the swap file.
|
||||
* Returns a List of all ResourceClaims that are referenced by the FlowFiles in the swap file.
|
||||
* This List may well contain the same ResourceClaim many times. This indicates that many FlowFiles
|
||||
* reference the same ResourceClaim.
|
||||
*
|
||||
|
|
|
@ -48,7 +48,7 @@ public interface ResourceClaimManager {
|
|||
* particular piece of FlowFile content and returns the new count
|
||||
*
|
||||
* @param claim to decrement claimants on
|
||||
* @return new claimaint count
|
||||
* @return new claimant count
|
||||
*/
|
||||
int decrementClaimantCount(ResourceClaim claim);
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ public interface LineageNode {
|
|||
* @return the timestamp that corresponds to this Node. The meaning of the
|
||||
* timestamp may differ between implementations. For example, a
|
||||
* {@link ProvenanceEventLineageNode}'s timestamp indicates the time at
|
||||
* which the event occurred. However, for a Node that reperesents a
|
||||
* which the event occurred. However, for a Node that represents a
|
||||
* FlowFile, for example, the timestamp may represent the time at which the
|
||||
* FlowFile was created
|
||||
*/
|
||||
|
|
|
@ -465,7 +465,7 @@ public interface TestRunner {
|
|||
* to the given relationship
|
||||
*
|
||||
* @param relationship to get flowfiles for
|
||||
* @return flowfiles transfered to given relationship
|
||||
* @return flowfiles transferred to given relationship
|
||||
*/
|
||||
List<MockFlowFile> getFlowFilesForRelationship(String relationship);
|
||||
|
||||
|
@ -474,7 +474,7 @@ public interface TestRunner {
|
|||
* to the given relationship
|
||||
*
|
||||
* @param relationship to get flowfiles for
|
||||
* @return flowfiles transfered to given relationship
|
||||
* @return flowfiles transferred to given relationship
|
||||
*/
|
||||
List<MockFlowFile> getFlowFilesForRelationship(Relationship relationship);
|
||||
|
||||
|
@ -757,7 +757,7 @@ public interface TestRunner {
|
|||
ValidationResult setProperty(ControllerService service, String propertyName, String value);
|
||||
|
||||
/**
|
||||
* Sets the annontation data of the given service to the provided annotation
|
||||
* Sets the annotation data of the given service to the provided annotation
|
||||
* data.
|
||||
*
|
||||
* @param service to modify
|
||||
|
|
|
@ -72,7 +72,7 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
|
|||
protected void onScheduledUsingControllerService(ProcessContext context) {
|
||||
final ClientType awsClient = createClient(context, getCredentialsProvider(context), createConfiguration(context));
|
||||
this.client = awsClient;
|
||||
super.intializeRegionAndEndpoint(context);
|
||||
super.initializeRegionAndEndpoint(context);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -204,10 +204,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
public void onScheduled(final ProcessContext context) {
|
||||
final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context));
|
||||
this.client = awsClient;
|
||||
intializeRegionAndEndpoint(context);
|
||||
initializeRegionAndEndpoint(context);
|
||||
}
|
||||
|
||||
protected void intializeRegionAndEndpoint(ProcessContext context) {
|
||||
protected void initializeRegionAndEndpoint(ProcessContext context) {
|
||||
// if the processor supports REGION, get the configured region.
|
||||
if (getSupportedPropertyDescriptors().contains(REGION)) {
|
||||
final String region = context.getProperty(REGION).getValue();
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TestGetSQS {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleGetUsingCredentailsProviderService() throws Throwable {
|
||||
public void testSimpleGetUsingCredentialsProviderService() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
|
||||
|
||||
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
|
||||
|
|
|
@ -54,7 +54,7 @@ public class TestPutSQS {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSimplePutUsingCredentailsProviderService() throws Throwable {
|
||||
public void testSimplePutUsingCredentialsProviderService() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
|
||||
|
||||
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
|
||||
|
|
|
@ -50,7 +50,7 @@ public class TestMetricsService {
|
|||
status.setActiveThreadCount(5);
|
||||
}
|
||||
|
||||
//test group status metric retreiveing
|
||||
//test group status metric retrieving
|
||||
@Test
|
||||
public void testGetProcessGroupStatusMetrics() {
|
||||
ProcessorStatus procStatus = new ProcessorStatus();
|
||||
|
@ -71,7 +71,7 @@ public class TestMetricsService {
|
|||
Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
|
||||
}
|
||||
|
||||
//test processor status metric retreiveing
|
||||
//test processor status metric retrieving
|
||||
@Test
|
||||
public void testGetProcessorGroupStatusMetrics() {
|
||||
ProcessorStatus procStatus = new ProcessorStatus();
|
||||
|
@ -88,7 +88,7 @@ public class TestMetricsService {
|
|||
Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
|
||||
}
|
||||
|
||||
//test JVM status metric retreiveing
|
||||
//test JVM status metric retrieving
|
||||
@Test
|
||||
public void testGetVirtualMachineMetrics() {
|
||||
final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
|
||||
|
|
|
@ -43,9 +43,9 @@ public class RootNode extends BxmlNode {
|
|||
List<VariantTypeSizeAndFactory> substitutionVariantFactories = new ArrayList<>(substitutionCount);
|
||||
for (long i = 0; i < substitutionCount; i++) {
|
||||
try {
|
||||
int substitionSize = binaryReader.readWord();
|
||||
int substitutionSize = binaryReader.readWord();
|
||||
int substitutionType = binaryReader.readWord();
|
||||
substitutionVariantFactories.add(new VariantTypeSizeAndFactory(substitionSize, ValueNode.factories.get(substitutionType)));
|
||||
substitutionVariantFactories.add(new VariantTypeSizeAndFactory(substitutionSize, ValueNode.factories.get(substitutionType)));
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.nifi.processors.evtx.parser.bxml.BxmlNode;
|
|||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Node contianing a signed 32 bit value
|
||||
* Node containing a signed 32 bit value
|
||||
*/
|
||||
public class SignedDWordTypeNode extends VariantTypeNode {
|
||||
private final UnsignedInteger value;
|
||||
|
|
|
@ -157,7 +157,7 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
|
|||
* @return the time period used for the timeout when communicating with this RemoteProcessGroup
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The time period used for the timeout when commicating with the target."
|
||||
value = "The time period used for the timeout when communicating with the target."
|
||||
)
|
||||
public String getCommunicationsTimeout() {
|
||||
return communicationsTimeout;
|
||||
|
@ -213,7 +213,7 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
|
|||
* @return number of active remote output ports
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The number of acitve remote output ports."
|
||||
value = "The number of active remote output ports."
|
||||
)
|
||||
public Integer getActiveRemoteOutputPortCount() {
|
||||
return activeRemoteOutputPortCount;
|
||||
|
|
|
@ -194,7 +194,7 @@ public class ReportingTaskDTO extends ComponentDTO {
|
|||
* @return currently configured annotation data for the reporting task
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The anntation data for the repoting task. This is how the custom UI relays configuration to the reporting task."
|
||||
value = "The annotation data for the repoting task. This is how the custom UI relays configuration to the reporting task."
|
||||
)
|
||||
public String getAnnotationData() {
|
||||
return annotationData;
|
||||
|
|
|
@ -128,7 +128,7 @@ public class ProvenanceResultsDTO {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return time offset on the server thats used for event time
|
||||
* @return time offset on the server that's used for event time
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The time offset of the server that's used for event time."
|
||||
|
|
|
@ -213,7 +213,7 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
|
|||
*
|
||||
* @return The transferred status for this process group
|
||||
*/
|
||||
@ApiModelProperty("The count/size transferred to/frome queues in the process group in the last 5 minutes.")
|
||||
@ApiModelProperty("The count/size transferred to/from queues in the process group in the last 5 minutes.")
|
||||
public String getTransferred() {
|
||||
return transferred;
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import javax.xml.bind.annotation.XmlRootElement;
|
|||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A serialized representation of this class can be placed in the entity body of a response to the API. This particular entity holds a reference to a list of input InputPortEntitys.
|
||||
* A serialized representation of this class can be placed in the entity body of a response to the API. This particular entity holds a reference to a list of input InputPortEntity's.
|
||||
*/
|
||||
@XmlRootElement(name = "inputPortsEntity")
|
||||
public class InputPortsEntity extends Entity {
|
||||
|
|
|
@ -32,7 +32,7 @@ public class RemoteProcessGroupStatusSnapshotEntity extends Entity implements Re
|
|||
/**
|
||||
* @return The remote process group id
|
||||
*/
|
||||
@ApiModelProperty("The id of the remote processo group.")
|
||||
@ApiModelProperty("The id of the remote process group.")
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
|
|
@ -209,7 +209,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger {
|
|||
|
||||
private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) {
|
||||
responses.stream()
|
||||
.parallel() // parallelize the draining of the responses, since we have multiple streams to consume
|
||||
.parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
|
||||
.filter(response -> response != exclude) // don't include the explicitly excluded node
|
||||
.filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content
|
||||
.forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out
|
||||
|
|
|
@ -140,7 +140,7 @@ public class ProvenanceQueryEndpointMerger implements EndpointResponseMerger {
|
|||
// Since we get back up to the maximum number of results from each node, we need to sort those values and then
|
||||
// grab only the first X number of them. We do a sort based on time, such that the newest are included.
|
||||
// If 2 events have the same timestamp, we do a secondary sort based on Cluster Node Identifier. If those are
|
||||
// equal, we perform a terciary sort based on the the event id
|
||||
// equal, we perform a tertiary sort based on the the event id
|
||||
Collections.sort(allResults, new Comparator<ProvenanceEventDTO>() {
|
||||
@Override
|
||||
public int compare(final ProvenanceEventDTO o1, final ProvenanceEventDTO o2) {
|
||||
|
|
|
@ -336,7 +336,7 @@ public class TestThreadPoolRequestReplicator {
|
|||
try {
|
||||
Thread.sleep(delayMillis);
|
||||
} catch (InterruptedException e) {
|
||||
Assert.fail("Thread Interrupted durating test");
|
||||
Assert.fail("Thread Interrupted during test");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.apache.nifi.web.Revision;
|
|||
*
|
||||
* <p>
|
||||
* If the first phase of the above two-phase commit completes and all nodes indicate that the
|
||||
* request may continue, this means that all nodes have agreed that the client's Revisios are
|
||||
* request may continue, this means that all nodes have agreed that the client's Revisions are
|
||||
* acceptable.
|
||||
* </p>
|
||||
*/
|
||||
|
|
|
@ -216,7 +216,7 @@ public class TemplateUtils {
|
|||
* so that if one of these things changes, the template itself changes, which makes it hard to
|
||||
* use a CM tool for versioning. So we remove all that we don't need.
|
||||
*
|
||||
* @param descriptor the ProeprtyDescriptor to scrub
|
||||
* @param descriptor the PropertyDescriptor to scrub
|
||||
*/
|
||||
private static void scrubPropertyDescriptor(final PropertyDescriptorDTO descriptor) {
|
||||
descriptor.setAllowableValues(null);
|
||||
|
|
|
@ -136,7 +136,7 @@ public class StandardStateManagerProvider implements StateManagerProvider{
|
|||
if (providerId.trim().isEmpty()) {
|
||||
throw new IllegalStateException("Cannot create " + providerDescription + " because the '" + providerIdPropertyName
|
||||
+ "' property in the NiFi Properties file has no value set. This is a required property and must reference the identifier of one of the "
|
||||
+ providerXmlElementName + " elements in the State Management Configuraiton File (" + configFile + ")");
|
||||
+ providerXmlElementName + " elements in the State Management Configuration File (" + configFile + ")");
|
||||
}
|
||||
|
||||
final StateManagerConfiguration config = StateManagerConfiguration.parse(configFile);
|
||||
|
|
|
@ -232,7 +232,7 @@ public class ReflectionUtils {
|
|||
try {
|
||||
return invokeMethodsWithAnnotations(true, logger, instance, annotationArray, args);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed while attemptiing to invoke methods with '" + Arrays.asList(annotationArray) + "' annotations", e);
|
||||
LOG.error("Failed while attempting to invoke methods with '" + Arrays.asList(annotationArray) + "' annotations", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,12 +91,12 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
|||
private class TransactionWrapper {
|
||||
|
||||
private final FlowFileTransaction transaction;
|
||||
private final HandshakeProperties handshakenProperties;
|
||||
private final HandshakeProperties handshakeProperties;
|
||||
private long lastCommunicationAt;
|
||||
|
||||
private TransactionWrapper(final FlowFileTransaction transaction, final HandshakeProperties handshakenProperties) {
|
||||
private TransactionWrapper(final FlowFileTransaction transaction, final HandshakeProperties handshakeProperties) {
|
||||
this.transaction = transaction;
|
||||
this.handshakenProperties = handshakenProperties;
|
||||
this.handshakeProperties = handshakeProperties;
|
||||
this.lastCommunicationAt = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
@ -197,7 +197,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
|||
public HandshakeProperties getHandshakenProperties(final String transactionId) {
|
||||
TransactionWrapper transaction = transactions.get(transactionId);
|
||||
if (isTransactionActive(transaction)) {
|
||||
return transaction.handshakenProperties;
|
||||
return transaction.handshakeProperties;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
|||
protected boolean shutdown = false;
|
||||
protected FlowFileCodec negotiatedFlowFileCodec = null;
|
||||
|
||||
protected HandshakeProperties handshakenProperties;
|
||||
protected HandshakeProperties handshakeProperties;
|
||||
|
||||
protected static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);
|
||||
|
||||
|
@ -195,7 +195,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
|||
|
||||
logger.debug("{} Handshaking with {}", this, peer);
|
||||
|
||||
this.handshakenProperties = doHandshake(peer);
|
||||
this.handshakeProperties = doHandshake(peer);
|
||||
|
||||
logger.debug("{} Finished handshake with {}", this, peer);
|
||||
handshakeCompleted = true;
|
||||
|
@ -242,7 +242,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
|||
String calculatedCRC = "";
|
||||
OutputStream os = new DataOutputStream(commsSession.getOutput().getOutputStream());
|
||||
while (continueTransaction) {
|
||||
final boolean useGzip = handshakenProperties.isUseGzip();
|
||||
final boolean useGzip = handshakeProperties.isUseGzip();
|
||||
final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(os) : os;
|
||||
logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer});
|
||||
|
||||
|
@ -278,15 +278,15 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
|||
// determine if we should check for more data on queue.
|
||||
final long sendingNanos = System.nanoTime() - startNanos;
|
||||
boolean poll = true;
|
||||
double batchDurationNanos = handshakenProperties.getBatchDurationNanos();
|
||||
double batchDurationNanos = handshakeProperties.getBatchDurationNanos();
|
||||
if (sendingNanos >= batchDurationNanos && batchDurationNanos > 0L) {
|
||||
poll = false;
|
||||
}
|
||||
double batchBytes = handshakenProperties.getBatchBytes();
|
||||
double batchBytes = handshakeProperties.getBatchBytes();
|
||||
if (bytesSent >= batchBytes && batchBytes > 0L) {
|
||||
poll = false;
|
||||
}
|
||||
double batchCount = handshakenProperties.getBatchCount();
|
||||
double batchCount = handshakeProperties.getBatchCount();
|
||||
if (flowFilesSent.size() >= batchCount && batchCount > 0) {
|
||||
poll = false;
|
||||
}
|
||||
|
@ -433,7 +433,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
|||
boolean continueTransaction = true;
|
||||
while (continueTransaction) {
|
||||
final long startNanos = System.nanoTime();
|
||||
final InputStream flowFileInputStream = handshakenProperties.isUseGzip() ? new CompressionInputStream(dis) : dis;
|
||||
final InputStream flowFileInputStream = handshakeProperties.isUseGzip() ? new CompressionInputStream(dis) : dis;
|
||||
final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc);
|
||||
|
||||
final DataPacket dataPacket = codec.decode(checkedInputStream);
|
||||
|
@ -551,12 +551,12 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
|||
|
||||
@Override
|
||||
public long getRequestExpiration() {
|
||||
return handshakenProperties.getExpirationMillis();
|
||||
return handshakeProperties.getExpirationMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String commid = handshakenProperties != null ? handshakenProperties.getCommsIdentifier() : null;
|
||||
String commid = handshakeProperties != null ? handshakeProperties.getCommsIdentifier() : null;
|
||||
return getClass().getSimpleName() + "[CommsID=" + commid + "]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,7 +181,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
|
|||
HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
|
||||
String transactionId = commSession.getTransactionId();
|
||||
logger.debug("{} Holding transaction. transactionId={}", this, transactionId);
|
||||
transactionManager.holdTransaction(transactionId, transaction, handshakenProperties);
|
||||
transactionManager.holdTransaction(transactionId, transaction, handshakeProperties);
|
||||
|
||||
return transaction.getFlowFilesSent().size();
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
|
|||
|
||||
@Override
|
||||
protected String createTransitUri(Peer peer, String sourceFlowFileIdentifier) {
|
||||
String transitUriPrefix = handshakenProperties.getTransitUriPrefix();
|
||||
String transitUriPrefix = handshakeProperties.getTransitUriPrefix();
|
||||
return (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ public class AccessPolicyAuditor extends NiFiAuditor {
|
|||
+ "args(accessPolicyDTO) && "
|
||||
+ "target(accessPolicyDAO)")
|
||||
public AccessPolicy updateAccessPolicyAdvice(ProceedingJoinPoint proceedingJoinPoint, AccessPolicyDTO accessPolicyDTO, AccessPolicyDAO accessPolicyDAO) throws Throwable {
|
||||
// determine the initial values for each property/setting thats changing
|
||||
// determine the initial values for each property/setting that's changing
|
||||
AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyDTO.getId());
|
||||
final Map<String, String> values = extractConfiguredPropertyValues(accessPolicy, accessPolicyDTO);
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
|
|||
+ "args(controllerServiceDTO) && "
|
||||
+ "target(controllerServiceDAO)")
|
||||
public Object updateControllerServiceAdvice(ProceedingJoinPoint proceedingJoinPoint, ControllerServiceDTO controllerServiceDTO, ControllerServiceDAO controllerServiceDAO) throws Throwable {
|
||||
// determine the initial values for each property/setting thats changing
|
||||
// determine the initial values for each property/setting that's changing
|
||||
ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
|
||||
final Map<String, String> values = extractConfiguredPropertyValues(controllerService, controllerServiceDTO);
|
||||
final boolean isDisabled = isDisabled(controllerService);
|
||||
|
|
|
@ -110,7 +110,7 @@ public class ProcessorAuditor extends NiFiAuditor {
|
|||
+ "args(processorDTO) && "
|
||||
+ "target(processorDAO)")
|
||||
public ProcessorNode updateProcessorAdvice(ProceedingJoinPoint proceedingJoinPoint, ProcessorDTO processorDTO, ProcessorDAO processorDAO) throws Throwable {
|
||||
// determine the initial values for each property/setting thats changing
|
||||
// determine the initial values for each property/setting that's changing
|
||||
ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId());
|
||||
final Map<String, String> values = extractConfiguredPropertyValues(processor, processorDTO);
|
||||
final ScheduledState scheduledState = processor.getScheduledState();
|
||||
|
|
|
@ -90,7 +90,7 @@ public class UserAuditor extends NiFiAuditor {
|
|||
+ "args(userDTO) && "
|
||||
+ "target(userDAO)")
|
||||
public User updateUserAdvice(ProceedingJoinPoint proceedingJoinPoint, UserDTO userDTO, UserDAO userDAO) throws Throwable {
|
||||
// determine the initial values for each property/setting thats changing
|
||||
// determine the initial values for each property/setting that's changing
|
||||
User user = userDAO.getUser(userDTO.getId());
|
||||
final Map<String, String> values = extractConfiguredPropertyValues(user, userDTO);
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ public class UserGroupAuditor extends NiFiAuditor {
|
|||
+ "args(userGroupDTO) && "
|
||||
+ "target(userGroupDAO)")
|
||||
public Group updateUserAdvice(ProceedingJoinPoint proceedingJoinPoint, UserGroupDTO userGroupDTO, UserGroupDAO userGroupDAO) throws Throwable {
|
||||
// determine the initial values for each property/setting thats changing
|
||||
// determine the initial values for each property/setting that's changing
|
||||
Group user = userGroupDAO.getUserGroup(userGroupDTO.getId());
|
||||
final Map<String, String> values = extractConfiguredPropertyValues(user, userGroupDTO);
|
||||
|
||||
|
|
|
@ -1902,7 +1902,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates entities for compnents referencing a ControllerServcie using the specified revisions.
|
||||
* Creates entities for components referencing a ControllerServcie using the specified revisions.
|
||||
*
|
||||
* @param reference ControllerServiceReference
|
||||
* @param revisions The revisions
|
||||
|
@ -2339,7 +2339,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
final ProcessorNode processor = processorDAO.getProcessor(id);
|
||||
PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
|
||||
|
||||
// return an invalid descriptor if the processor doesn't suppor this property
|
||||
// return an invalid descriptor if the processor doesn't support this property
|
||||
if (descriptor == null) {
|
||||
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Maps mutal request exceptions into client responses.
|
||||
* Maps mutual request exceptions into client responses.
|
||||
*/
|
||||
@Provider
|
||||
public class MutableRequestExceptionMapper implements ExceptionMapper<MutableRequestException> {
|
||||
|
|
|
@ -1058,7 +1058,7 @@ public class ControllerFacade implements Authorizable {
|
|||
provenanceDto.setResults(resultsDto);
|
||||
return provenanceDto;
|
||||
} catch (final IOException ioe) {
|
||||
throw new NiFiCoreException("An error occured while searching the provenance events.", ioe);
|
||||
throw new NiFiCoreException("An error occurred while searching the provenance events.", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1220,7 +1220,7 @@ public class ControllerFacade implements Authorizable {
|
|||
// convert the event record
|
||||
return createProvenanceEventDto(event);
|
||||
} catch (final IOException ioe) {
|
||||
throw new NiFiCoreException("An error occured while getting the specified event.", ioe);
|
||||
throw new NiFiCoreException("An error occurred while getting the specified event.", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1292,7 +1292,7 @@ public class ControllerFacade implements Authorizable {
|
|||
// convert the event
|
||||
return createProvenanceEventDto(event);
|
||||
} catch (final IOException ioe) {
|
||||
throw new NiFiCoreException("An error occured while getting the specified event.", ioe);
|
||||
throw new NiFiCoreException("An error occurred while getting the specified event.", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.Set;
|
|||
public interface TemplateDAO {
|
||||
|
||||
/**
|
||||
* Verifies a new template can be created with the specifed name in the specified group.
|
||||
* Verifies a new template can be created with the specified name in the specified group.
|
||||
*
|
||||
* @param name template name
|
||||
* @param groupId group id
|
||||
|
|
|
@ -159,7 +159,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
|
|||
// get the controller service
|
||||
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
|
||||
|
||||
// this request is either acting upon referncing services or schedulable components
|
||||
// this request is either acting upon referencing services or schedulable components
|
||||
if (controllerServiceState != null) {
|
||||
if (ControllerServiceState.ENABLED.equals(controllerServiceState)) {
|
||||
return serviceProvider.enableReferencingServices(controllerService);
|
||||
|
|
|
@ -131,7 +131,7 @@ public class ITAccessTokenEndpoint {
|
|||
public void testCreateProcessorUsingToken() throws Exception {
|
||||
String url = BASE_URL + "/access/token";
|
||||
|
||||
ClientResponse response = TOKEN_USER.testCreateToken(url, "user@nifi", "whateve");
|
||||
ClientResponse response = TOKEN_USER.testCreateToken(url, "user@nifi", "whatever");
|
||||
|
||||
// ensure the request is successful
|
||||
Assert.assertEquals(201, response.getStatus());
|
||||
|
|
|
@ -108,7 +108,7 @@ public class ContentViewerController extends HttpServlet {
|
|||
viewerContext.getRequestDispatcher("/message").forward(request, response);
|
||||
return;
|
||||
} catch (final AccessDeniedException ade) {
|
||||
request.setAttribute("title", "Acess Denied");
|
||||
request.setAttribute("title", "Access Denied");
|
||||
request.setAttribute("messages", "Unable to approve access to the specified content: " + ade.getMessage());
|
||||
|
||||
// forward to the error page
|
||||
|
|
|
@ -25,7 +25,7 @@ import javax.servlet.ServletRequest;
|
|||
import javax.servlet.ServletResponse;
|
||||
|
||||
/**
|
||||
* Filter for foward all requests to index.jsp.
|
||||
* Filter for forward all requests to index.jsp.
|
||||
*/
|
||||
public class CatchAllFilter implements Filter {
|
||||
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %>
|
||||
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
|
||||
<div id="provenance">
|
||||
<span id="intial-component-query" class="hidden"><c:out value="${param.componentId}"/></span>
|
||||
<span id="intial-flowfile-query" class="hidden"><c:out value="${param.flowFileUuid}"/></span>
|
||||
<span id="initial-component-query" class="hidden"><c:out value="${param.componentId}"/></span>
|
||||
<span id="initial-flowfile-query" class="hidden"><c:out value="${param.flowFileUuid}"/></span>
|
||||
<span id="nifi-controller-uri" class="hidden"></span>
|
||||
<span id="nifi-content-viewer-url" class="hidden"></span>
|
||||
<div id="provenance-header-text">NiFi Data Provenance</div>
|
||||
|
|
|
@ -199,7 +199,7 @@ nf.ng.Provenance = function (provenanceTableCtrl) {
|
|||
var searchTerms = {};
|
||||
|
||||
// look for a processor id in the query search
|
||||
var initialComponentId = $('#intial-component-query').text();
|
||||
var initialComponentId = $('#initial-component-query').text();
|
||||
if ($.trim(initialComponentId) !== '') {
|
||||
// populate initial search component
|
||||
$('input.searchable-component-id').val(initialComponentId);
|
||||
|
@ -209,7 +209,7 @@ nf.ng.Provenance = function (provenanceTableCtrl) {
|
|||
}
|
||||
|
||||
// look for a flowfile uuid in the query search
|
||||
var initialFlowFileUuid = $('#intial-flowfile-query').text();
|
||||
var initialFlowFileUuid = $('#initial-flowfile-query').text();
|
||||
if ($.trim(initialFlowFileUuid) !== '') {
|
||||
// populate initial search component
|
||||
$('input.searchable-flowfile-uuid').val(initialFlowFileUuid);
|
||||
|
|
|
@ -102,7 +102,7 @@ public class TestListHDFS {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testListingWithInalidELFunction() throws InterruptedException {
|
||||
public void testListingWithInvalidELFunction() throws InterruptedException {
|
||||
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
|
|
@ -75,24 +75,24 @@ public class TestEventTypeValidator {
|
|||
@Test
|
||||
public void inputWithInvalidEventTypeShouldProperlyDisplayEventsInExplanation() throws Exception {
|
||||
String subject = "subject";
|
||||
String input = "append, CREATE, cllose, rename, metadata, unlink";
|
||||
String input = "append, CREATE, invalidValue1, rename, metadata, unlink";
|
||||
ValidationResult result = eventTypeValidator.validate(subject, input, context);
|
||||
|
||||
assertEquals("subject", result.getSubject());
|
||||
assertEquals("append, CREATE, cllose, rename, metadata, unlink", result.getInput());
|
||||
assertEquals("The following are not valid event types: [cllose]", result.getExplanation());
|
||||
assertEquals("append, CREATE, invalidValue1, rename, metadata, unlink", result.getInput());
|
||||
assertEquals("The following are not valid event types: [invalidValue1]", result.getExplanation());
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void inputWithMultipleInvalidEventTypeShouldProperlyDisplayEventsInExplanation() throws Exception {
|
||||
String subject = "subject";
|
||||
String input = "append, CREATE, cllose, rename, metadata, unlink, unllink";
|
||||
String input = "append, CREATE, invalidValue1, rename, metadata, unlink, invalidValue2";
|
||||
ValidationResult result = eventTypeValidator.validate(subject, input, context);
|
||||
|
||||
assertEquals("subject", result.getSubject());
|
||||
assertEquals("append, CREATE, cllose, rename, metadata, unlink, unllink", result.getInput());
|
||||
assertEquals("The following are not valid event types: [cllose, unllink]", result.getExplanation());
|
||||
assertEquals("append, CREATE, invalidValue1, rename, metadata, unlink, invalidValue2", result.getInput());
|
||||
assertEquals("The following are not valid event types: [invalidValue1, invalidValue2]", result.getExplanation());
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,11 +71,11 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
|||
protected static final AllowableValue ROW_ID_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
|
||||
"Stores the value of row id as a UTF-8 String.");
|
||||
protected static final AllowableValue ROW_ID_ENCODING_BINARY = new AllowableValue(BINARY_ENCODING_VALUE, BINARY_ENCODING_VALUE,
|
||||
"Stores the value of the rows id as a binary byte array. It expects that the row id is a binary formated string.");
|
||||
"Stores the value of the rows id as a binary byte array. It expects that the row id is a binary formatted string.");
|
||||
|
||||
static final PropertyDescriptor ROW_ID_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("Row Identifier Encoding Strategy")
|
||||
.description("Specifies the data type of Row ID used when inserting data into HBase. The default behaviror is" +
|
||||
.description("Specifies the data type of Row ID used when inserting data into HBase. The default behavior is" +
|
||||
" to convert the row id to a UTF-8 byte array. Choosing Binary will convert a binary formatted string" +
|
||||
" to the correct byte[] representation. The Binary option should be used if you are using Binary row" +
|
||||
" keys in HBase")
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TestPutHBaseCell {
|
|||
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
|
||||
|
||||
final String content = "some content";
|
||||
final Map<String, String> attributes = getAtrributeMapWithEL(tableName, row, columnFamily, columnQualifier);
|
||||
final Map<String, String> attributes = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier);
|
||||
runner.enqueue(content.getBytes("UTF-8"), attributes);
|
||||
|
||||
runner.run();
|
||||
|
@ -144,7 +144,7 @@ public class TestPutHBaseCell {
|
|||
|
||||
// this will go to success
|
||||
final String content2 = "some content2";
|
||||
final Map<String, String> attributes = getAtrributeMapWithEL("table", "row", "cf", "cq");
|
||||
final Map<String, String> attributes = getAttributeMapWithEL("table", "row", "cf", "cq");
|
||||
runner.enqueue(content2.getBytes("UTF-8"), attributes);
|
||||
|
||||
runner.run();
|
||||
|
@ -167,11 +167,11 @@ public class TestPutHBaseCell {
|
|||
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
|
||||
|
||||
final String content1 = "some content1";
|
||||
final Map<String, String> attributes1 = getAtrributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
|
||||
final Map<String, String> attributes1 = getAttributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
|
||||
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
|
||||
|
||||
final String content2 = "some content1";
|
||||
final Map<String, String> attributes2 = getAtrributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
|
||||
final Map<String, String> attributes2 = getAttributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
|
||||
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
|
||||
|
||||
runner.run();
|
||||
|
@ -205,11 +205,11 @@ public class TestPutHBaseCell {
|
|||
hBaseClient.setThrowException(true);
|
||||
|
||||
final String content1 = "some content1";
|
||||
final Map<String, String> attributes1 = getAtrributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
|
||||
final Map<String, String> attributes1 = getAttributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
|
||||
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
|
||||
|
||||
final String content2 = "some content1";
|
||||
final Map<String, String> attributes2 = getAtrributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
|
||||
final Map<String, String> attributes2 = getAttributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
|
||||
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
|
||||
|
||||
runner.run();
|
||||
|
@ -230,7 +230,7 @@ public class TestPutHBaseCell {
|
|||
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
|
||||
|
||||
final String content1 = "some content1";
|
||||
final Map<String, String> attributes1 = getAtrributeMapWithEL(tableName, row, columnFamily, columnQualifier);
|
||||
final Map<String, String> attributes1 = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier);
|
||||
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
|
||||
|
||||
final String content2 = "some content1";
|
||||
|
@ -299,7 +299,7 @@ public class TestPutHBaseCell {
|
|||
|
||||
assertEquals(1, runner.getProvenanceEvents().size());
|
||||
}
|
||||
private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
|
||||
private Map<String, String> getAttributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
|
||||
final Map<String,String> attributes1 = new HashMap<>();
|
||||
attributes1.put("hbase.tableName", tableName);
|
||||
attributes1.put("hbase.row", row);
|
||||
|
|
|
@ -64,9 +64,9 @@ import java.util.regex.Pattern;
|
|||
+ "with the naming convention hiveql.args.N.type and hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is expected to be "
|
||||
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
|
||||
@ReadsAttributes({
|
||||
@ReadsAttribute(attribute = "hiveql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized HiveQL statements. The type of each Parameter is specified as an integer "
|
||||
@ReadsAttribute(attribute = "hiveql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The type of each Parameter is specified as an integer "
|
||||
+ "that represents the JDBC Type of the parameter."),
|
||||
@ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized HiveQL statements. The value of the Parameters are specified as "
|
||||
@ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The value of the Parameters are specified as "
|
||||
+ "hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.")
|
||||
})
|
||||
public class PutHiveQL extends AbstractHiveQLProcessor {
|
||||
|
|
|
@ -165,7 +165,7 @@ public class TestSelectHiveQL {
|
|||
stmt.execute("create table TEST_NO_ROWS (id integer)");
|
||||
|
||||
runner.setIncomingConnection(false);
|
||||
// Try a valid SQL statment that will generate an error (val1 does not exist, e.g.)
|
||||
// Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
|
||||
runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
|
||||
runner.run();
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ public abstract class AbstractHTMLProcessor extends AbstractProcessor {
|
|||
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
|
||||
.name("element not found")
|
||||
.description("Element could not be found in the HTML document. The original HTML input will remain " +
|
||||
"in the flowfile content unchanged. Relationship '" + REL_ORIGINAL + "' will not be invoked " +
|
||||
"in the FlowFile content unchanged. Relationship '" + REL_ORIGINAL + "' will not be invoked " +
|
||||
"in this scenario.")
|
||||
.build();
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ import org.apache.nifi.stream.io.StreamUtils;
|
|||
@CapabilityDescription("Stream the contents of a FlowFile to Ignite Cache using DataStreamer. " +
|
||||
"The processor uses the value of FlowFile attribute (Ignite cache entry key) as the " +
|
||||
"cache key and the byte array of the FlowFile as the value of the cache entry value. Both the string key and a " +
|
||||
" non-empty byte array value are required otherwise the FlowFile is transfered to the failure relation. " +
|
||||
" non-empty byte array value are required otherwise the FlowFile is transferred to the failure relation. " +
|
||||
"Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message " +
|
||||
" can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.")
|
||||
@WritesAttributes({
|
||||
|
@ -208,7 +208,7 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
|
|||
* Initialize ignite cache
|
||||
*/
|
||||
@OnScheduled
|
||||
public final void initilizeIgniteDataStreamer(ProcessContext context) throws ProcessException {
|
||||
public final void initializeIgniteDataStreamer(ProcessContext context) throws ProcessException {
|
||||
super.initializeIgniteCache(context);
|
||||
|
||||
if ( getIgniteDataStreamer() != null ) {
|
||||
|
|
|
@ -92,8 +92,8 @@ public class TestPutIgniteCache {
|
|||
runner.run(1, false, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(0, failureFlowFiles.size());
|
||||
|
||||
|
@ -123,8 +123,8 @@ public class TestPutIgniteCache {
|
|||
runner.run(1, false, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(0, failureFlowFiles.size());
|
||||
|
||||
|
@ -156,8 +156,8 @@ public class TestPutIgniteCache {
|
|||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
|
||||
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(2, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(2, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(0, failureFlowFiles.size());
|
||||
|
||||
|
@ -201,8 +201,8 @@ public class TestPutIgniteCache {
|
|||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
|
||||
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(2, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(2, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(0, failureFlowFiles.size());
|
||||
|
||||
|
@ -245,8 +245,8 @@ public class TestPutIgniteCache {
|
|||
runner.run(1, false, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 1);
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(1, failureFlowFiles.size());
|
||||
|
||||
|
@ -277,8 +277,8 @@ public class TestPutIgniteCache {
|
|||
runner.run(1, false, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 1);
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(1, failureFlowFiles.size());
|
||||
|
||||
|
@ -310,8 +310,8 @@ public class TestPutIgniteCache {
|
|||
runner.run(1, false, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(2, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(2, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(0, failureFlowFiles.size());
|
||||
|
||||
|
@ -355,8 +355,8 @@ public class TestPutIgniteCache {
|
|||
runner.run(1, false, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 2);
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(2, failureFlowFiles.size());
|
||||
|
||||
|
@ -400,8 +400,8 @@ public class TestPutIgniteCache {
|
|||
runner.enqueue("test2".getBytes(),properties2);
|
||||
runner.run(1, false, true);
|
||||
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(1, failureFlowFiles.size());
|
||||
|
||||
|
@ -445,8 +445,8 @@ public class TestPutIgniteCache {
|
|||
runner.enqueue("test2".getBytes());
|
||||
runner.run(1, false, true);
|
||||
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(1, failureFlowFiles.size());
|
||||
|
||||
|
@ -492,8 +492,8 @@ public class TestPutIgniteCache {
|
|||
runner.run(1, false, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 2);
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(0, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(2, failureFlowFiles.size());
|
||||
|
||||
|
@ -538,8 +538,8 @@ public class TestPutIgniteCache {
|
|||
runner.enqueue("".getBytes(),properties2);
|
||||
runner.run(1, false, true);
|
||||
|
||||
List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, sucessfulFlowFiles.size());
|
||||
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS);
|
||||
assertEquals(1, successfulFlowFiles.size());
|
||||
List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE);
|
||||
assertEquals(2, failureFlowFiles.size());
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
|
|||
if (!this.configured) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Configuring " + this.getClass().getSimpleName() + " for '"
|
||||
+ context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be conected to '"
|
||||
+ context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be connected to '"
|
||||
+ BROKER_URI + "'");
|
||||
}
|
||||
// will load user provided libraries/resources on the classpath
|
||||
|
|
|
@ -118,7 +118,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
/**
|
||||
* Copies JMS attributes (i.e., headers and properties) as FF attributes.
|
||||
* Given that FF attributes mandate that values are of type String, the
|
||||
* copied values of JMS attributes will be stringified via
|
||||
* copied values of JMS attributes will be "stringified" via
|
||||
* String.valueOf(attribute).
|
||||
*/
|
||||
private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, Object> jmsAttributes, FlowFile flowFile,
|
||||
|
|
|
@ -81,7 +81,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
|
|||
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
|
||||
.name("topic")
|
||||
.displayName("Topic Name(s)")
|
||||
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma seperated.")
|
||||
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
|
|
|
@ -108,7 +108,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
|
|||
@Override
|
||||
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
|
||||
logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
|
||||
//force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
|
||||
//force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
|
||||
commit();
|
||||
}
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ import kafka.message.MessageAndMetadata;
|
|||
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
||||
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
||||
+ " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
|
||||
+ " overriden with warning message describing the override."
|
||||
+ " overridden with warning message describing the override."
|
||||
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
|
||||
public class GetKafka extends AbstractProcessor {
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ import kafka.utils.ZKStringSerializer;
|
|||
import scala.collection.JavaConversions;
|
||||
|
||||
/**
|
||||
* Utility class to support interruction with Kafka internals.
|
||||
* Utility class to support interaction with Kafka internals.
|
||||
*
|
||||
*/
|
||||
class KafkaUtils {
|
||||
|
|
|
@ -272,9 +272,9 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
|
|||
* Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
|
||||
* producing a result {@link FlowFile}.
|
||||
* <br>
|
||||
* The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS}
|
||||
* The result {@link FlowFile} that is successful is then transferred to {@link #REL_SUCCESS}
|
||||
* <br>
|
||||
* The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE}
|
||||
* The result {@link FlowFile} that is failed is then transferred to {@link #REL_FAILURE}
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,7 @@ public class GetKafkaIntegrationTests {
|
|||
private static EmbeddedKafkaProducerHelper producerHelper;
|
||||
|
||||
@BeforeClass
|
||||
public static void bforeClass(){
|
||||
public static void beforeClass(){
|
||||
kafkaLocal = new EmbeddedKafka();
|
||||
kafkaLocal.start();
|
||||
producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
|
||||
|
|
|
@ -54,7 +54,7 @@ socket.request.max.bytes=104857600
|
|||
|
||||
############################# Log Basics #############################
|
||||
|
||||
# A comma seperated list of directories under which to store log files
|
||||
# A comma separated list of directories under which to store log files
|
||||
log.dirs=target/kafka-tmp/kafka-logs
|
||||
|
||||
# The default number of log partitions per topic. More partitions allow greater
|
||||
|
|
|
@ -81,7 +81,7 @@ public class ConsumeKafka extends AbstractProcessor {
|
|||
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
|
||||
.name("topic")
|
||||
.displayName("Topic Name(s)")
|
||||
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma seperated.")
|
||||
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TestLumberjackFrameHandler {
|
|||
0x00, 0x00, 0x00, 0x02, // Number of pairs
|
||||
0x00, 0x00, 0x00, 0x04, // Length of first pair key ('line')
|
||||
0x6C, 0x69, 0x6E, 0x65, // 'line'
|
||||
0x00, 0x00, 0x00, 0x0C, // Lenght of 'test-content'
|
||||
0x00, 0x00, 0x00, 0x0C, // Length of 'test-content'
|
||||
0x74, 0x65, 0x73, 0x74, //
|
||||
0x2d, 0x63, 0x6f, 0x6e, // 'test-content'
|
||||
0x74, 0x65, 0x6e, 0x74, //
|
||||
|
|
|
@ -1980,7 +1980,7 @@ public class TestPersistentProvenanceRepository {
|
|||
final ProvenanceEventRecord record = builder.build();
|
||||
try {
|
||||
repo.registerEvent(record);
|
||||
Assert.fail("Expected OutOfMmeoryError but was able to register event");
|
||||
Assert.fail("Expected OutOfMemoryError but was able to register event");
|
||||
} catch (final OutOfMemoryError oome) {
|
||||
}
|
||||
}
|
||||
|
@ -1991,7 +1991,7 @@ public class TestPersistentProvenanceRepository {
|
|||
final ProvenanceEventRecord record = builder.build();
|
||||
try {
|
||||
repo.registerEvent(record);
|
||||
Assert.fail("Expected OutOfMmeoryError but was able to register event");
|
||||
Assert.fail("Expected OutOfMemoryError but was able to register event");
|
||||
} catch (final IllegalStateException ise) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,7 +139,7 @@ public class TestRangerNiFiAuthorizer {
|
|||
Assert.fail("Should have thrown exception");
|
||||
} catch (AuthorizerCreationException e) {
|
||||
// want to make sure this exception is from our authorizer code
|
||||
veryifyOnlyAuthorizerCreationExceptions(e);
|
||||
verifyOnlyAuthorizeCreationExceptions(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -159,7 +159,7 @@ public class TestRangerNiFiAuthorizer {
|
|||
Assert.fail("Should have thrown exception");
|
||||
} catch (AuthorizerCreationException e) {
|
||||
// want to make sure this exception is from our authorizer code
|
||||
veryifyOnlyAuthorizerCreationExceptions(e);
|
||||
verifyOnlyAuthorizeCreationExceptions(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,11 +180,11 @@ public class TestRangerNiFiAuthorizer {
|
|||
Assert.fail("Should have thrown exception");
|
||||
} catch (AuthorizerCreationException e) {
|
||||
// want to make sure this exception is from our authorizer code
|
||||
veryifyOnlyAuthorizerCreationExceptions(e);
|
||||
verifyOnlyAuthorizeCreationExceptions(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void veryifyOnlyAuthorizerCreationExceptions(AuthorizerCreationException e) {
|
||||
private void verifyOnlyAuthorizeCreationExceptions(AuthorizerCreationException e) {
|
||||
boolean foundOtherException = false;
|
||||
Throwable cause = e.getCause();
|
||||
while (cause != null) {
|
||||
|
|
|
@ -28,7 +28,7 @@ public class TestGetTwitter {
|
|||
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
|
||||
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
|
||||
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
|
||||
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,41");
|
||||
runner.assertValid();
|
||||
|
@ -40,7 +40,7 @@ public class TestGetTwitter {
|
|||
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
|
||||
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
|
||||
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
|
||||
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,40");
|
||||
runner.assertNotValid();
|
||||
|
@ -52,7 +52,7 @@ public class TestGetTwitter {
|
|||
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
|
||||
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
|
||||
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
|
||||
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-74,41");
|
||||
runner.assertNotValid();
|
||||
|
@ -64,7 +64,7 @@ public class TestGetTwitter {
|
|||
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
|
||||
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
|
||||
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
|
||||
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,39");
|
||||
runner.assertNotValid();
|
||||
|
@ -76,7 +76,7 @@ public class TestGetTwitter {
|
|||
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
|
||||
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
|
||||
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
|
||||
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
|
||||
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-75,41");
|
||||
runner.assertNotValid();
|
||||
|
|
|
@ -141,7 +141,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
stopWatch.stop();
|
||||
final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
|
||||
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||
logger.info("Successfully transfered {} to {} on remote host {} in {} milliseconds at a rate of {}",
|
||||
logger.info("Successfully transferred {} to {} on remote host {} in {} milliseconds at a rate of {}",
|
||||
new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate});
|
||||
|
||||
String fullPathWithSlash = fullPathRef.get();
|
||||
|
|
|
@ -89,12 +89,12 @@ import java.util.regex.Pattern;
|
|||
+ "are needed to complete the transaction."),
|
||||
@ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
|
||||
+ "in a transaction should be evaluated."),
|
||||
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
|
||||
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer "
|
||||
+ "that represents the JDBC Type of the parameter."),
|
||||
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
|
||||
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as "
|
||||
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
|
||||
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
|
||||
+ "Incoming FlowFiles are expected to be parameterized SQL statements. In some cases "
|
||||
+ "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
|
||||
+ "a format option needs to be specified, currently this is only applicable for binary data types. For binary data types "
|
||||
+ "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format "
|
||||
+ "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all "
|
||||
|
|
|
@ -93,7 +93,7 @@ public class ReplaceText extends AbstractProcessor {
|
|||
"Insert the Replacement Value at the beginning of the FlowFile or the beginning of each line (depending on the Evaluation Mode). For \"Line-by-Line\" Evaluation Mode, "
|
||||
+ "the value will be prepended to each line. For \"Entire Text\" evaluation mode, the value will be prepended to the entire text.");
|
||||
static final AllowableValue APPEND = new AllowableValue(appendValue, appendValue,
|
||||
"Insert the Replacement Value at the end of the FlowFile or the end of each line (depending on the Evluation Mode). For \"Line-by-Line\" Evaluation Mode, "
|
||||
"Insert the Replacement Value at the end of the FlowFile or the end of each line (depending on the Evaluation Mode). For \"Line-by-Line\" Evaluation Mode, "
|
||||
+ "the value will be appended to each line. For \"Entire Text\" evaluation mode, the value will be appended to the entire text.");
|
||||
static final AllowableValue LITERAL_REPLACE = new AllowableValue(literalReplaceValue, literalReplaceValue,
|
||||
"Search for all instances of the Search Value and replace the matches with the Replacement Value.");
|
||||
|
|
|
@ -64,7 +64,7 @@ import org.apache.nifi.stream.io.StreamUtils;
|
|||
+ "content. User-Defined properties do support the Attribute Expression Language, but the results are interpreted as "
|
||||
+ "literal values, not Regular Expressions")
|
||||
@DynamicProperty(name = "Relationship Name", value = "A Regular Expression", supportsExpressionLanguage = true, description = "Routes FlowFiles whose "
|
||||
+ "content matches the regular expressoin defined by Dynamic Property's value to the Relationship defined by the Dynamic Property's key")
|
||||
+ "content matches the regular expression defined by Dynamic Property's value to the Relationship defined by the Dynamic Property's key")
|
||||
@DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's Regular Expression")
|
||||
public class RouteOnContent extends AbstractProcessor {
|
||||
|
||||
|
|
|
@ -263,11 +263,11 @@ public class TransformXml extends AbstractProcessor {
|
|||
} else {
|
||||
String error = null;
|
||||
final File stylesheet = new File(input);
|
||||
final TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl();
|
||||
final TransformerFactory tFactory = new net.sf.saxon.TransformerFactoryImpl();
|
||||
final StreamSource styleSource = new StreamSource(stylesheet);
|
||||
|
||||
try {
|
||||
tfactory.newTransformer(styleSource);
|
||||
tFactory.newTransformer(styleSource);
|
||||
} catch (final Exception e) {
|
||||
error = e.toString();
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ public class PGPUtil {
|
|||
}
|
||||
|
||||
try {
|
||||
// TODO: Can probably hardcode provider to BC and remove one method parameter
|
||||
// TODO: Can probably hard-code provider to BC and remove one method parameter
|
||||
PGPEncryptedDataGenerator encryptedDataGenerator = new PGPEncryptedDataGenerator(
|
||||
new JcePGPDataEncryptorBuilder(cipher).setWithIntegrityPacket(true).setSecureRandom(new SecureRandom()).setProvider(provider));
|
||||
|
||||
|
|
|
@ -189,7 +189,7 @@ public class TestExecuteSQL {
|
|||
stmt.execute("create table TEST_NO_ROWS (id integer)");
|
||||
|
||||
runner.setIncomingConnection(false);
|
||||
// Try a valid SQL statment that will generate an error (val1 does not exist, e.g.)
|
||||
// Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
|
||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
|
||||
runner.run();
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
|
|||
runner.run();
|
||||
Assert.fail();
|
||||
} catch (AssertionError e){
|
||||
// Expect assetion error when proxy port isn't set but host is.
|
||||
// Expect assertion error when proxy port isn't set but host is.
|
||||
}
|
||||
runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
|
||||
|
||||
|
@ -150,7 +150,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
|
|||
runner.run();
|
||||
Assert.fail();
|
||||
} catch (AssertionError e){
|
||||
// Expect assetion error when proxy password isn't set but host is.
|
||||
// Expect assertion error when proxy password isn't set but host is.
|
||||
}
|
||||
runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, "password");
|
||||
|
||||
|
|
|
@ -89,15 +89,15 @@ public class TestListenSyslog {
|
|||
|
||||
// call onTrigger until we read all datagrams, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
int numTransferred = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
while (numTransferred < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
|
||||
Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
|
||||
|
@ -142,15 +142,15 @@ public class TestListenSyslog {
|
|||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
int nubTransferred = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
|
||||
|
@ -193,15 +193,15 @@ public class TestListenSyslog {
|
|||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
int nubTransferred = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
|
||||
|
@ -244,15 +244,15 @@ public class TestListenSyslog {
|
|||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
int nubTransferred = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(10);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
|
||||
}
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
|
||||
checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
|
||||
|
@ -347,17 +347,17 @@ public class TestListenSyslog {
|
|||
|
||||
// call onTrigger until we read all messages, or 30 seconds passed
|
||||
try {
|
||||
int numTransfered = 0;
|
||||
int nubTransferred = 0;
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
|
||||
while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
|
||||
while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(50);
|
||||
proc.onTrigger(context, processSessionFactory);
|
||||
numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
|
||||
nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
|
||||
}
|
||||
|
||||
// all messages should be transferred to invalid
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
|
||||
Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
|
||||
|
||||
} finally {
|
||||
// unschedule to close connections
|
||||
|
|
|
@ -172,7 +172,7 @@ public class TestListenUDP {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRunWhenNoEventsAvailale() throws IOException, InterruptedException {
|
||||
public void testRunWhenNoEventsAvailable() throws IOException, InterruptedException {
|
||||
final List<StandardEvent> mockEvents = new ArrayList<>();
|
||||
|
||||
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
|
||||
|
|
|
@ -107,7 +107,7 @@ public class TestMergeContent {
|
|||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
|
||||
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
|
||||
|
||||
// create a reader for the merged contet
|
||||
// create a reader for the merged content
|
||||
byte[] data = runner.getContentAsByteArray(bundle);
|
||||
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
|
||||
|
||||
|
|
|
@ -223,7 +223,7 @@ public class TestMonitorActivity {
|
|||
List<MockFlowFile> inactiveFlowFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
|
||||
if (inactiveFlowFiles.size() == 1) {
|
||||
// Seems Threshold was not sufficient, which has caused One inactive message.
|
||||
// Step-up and rerun the test until successful or jUnit Timesout
|
||||
// Step-up and rerun the test until successful or jUnit times out
|
||||
threshold += threshold;
|
||||
rerun = true;
|
||||
} else {
|
||||
|
|
|
@ -53,7 +53,7 @@ public class TestPutDistributedMapCache {
|
|||
@Test
|
||||
public void testNoCacheKey() throws InitializationException {
|
||||
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
|
||||
runner.enqueue(new byte[] {});
|
||||
|
||||
runner.run();
|
||||
|
@ -66,10 +66,10 @@ public class TestPutDistributedMapCache {
|
|||
|
||||
@Test
|
||||
public void testSingleFlowFile() throws InitializationException, IOException {
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
|
||||
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put("caheKeyAttribute", "1");
|
||||
props.put("cacheKeyAttribute", "1");
|
||||
|
||||
String flowFileContent = "content";
|
||||
runner.enqueue(flowFileContent.getBytes("UTF-8"), props);
|
||||
|
@ -90,10 +90,10 @@ public class TestPutDistributedMapCache {
|
|||
|
||||
@Test
|
||||
public void testNothingToCache() throws InitializationException, IOException {
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
|
||||
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put("caheKeyAttribute", "2");
|
||||
props.put("cacheKeyAttribute", "2");
|
||||
|
||||
// flow file without content
|
||||
runner.enqueue(new byte[] {}, props);
|
||||
|
@ -132,11 +132,11 @@ public class TestPutDistributedMapCache {
|
|||
@Test
|
||||
public void testCacheStrategyReplace() throws InitializationException, IOException {
|
||||
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_REPLACE.getValue());
|
||||
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put("caheKeyAttribute", "replaceme");
|
||||
props.put("cacheKeyAttribute", "replaceme");
|
||||
|
||||
String original = "original";
|
||||
runner.enqueue(original.getBytes("UTF-8"), props);
|
||||
|
@ -176,11 +176,11 @@ public class TestPutDistributedMapCache {
|
|||
@Test
|
||||
public void testCacheStrategyKeepOriginal() throws InitializationException, IOException {
|
||||
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
|
||||
runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_KEEP_ORIGINAL.getValue());
|
||||
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put("caheKeyAttribute", "replaceme");
|
||||
props.put("cacheKeyAttribute", "replaceme");
|
||||
|
||||
String original = "original";
|
||||
runner.enqueue(original.getBytes("UTF-8"), props);
|
||||
|
|
|
@ -84,7 +84,7 @@ public class TestReplaceText {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithUnEscaped$InReplacemenmt() throws IOException {
|
||||
public void testWithUnEscaped$InReplacement() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ReplaceText());
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(ReplaceText.SEARCH_VALUE, "(?s:^.*$)");
|
||||
|
|
|
@ -81,7 +81,7 @@ public class TestScanContent {
|
|||
|
||||
while (!runner.isQueueEmpty()) {
|
||||
runner.run(3);
|
||||
try { //must insert this deley or flowfiles are made so close together they become out of order in the queu
|
||||
try { //must insert this delay or flowfiles are made so close together they become out of order in the queue
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ex) {
|
||||
//moving on
|
||||
|
|
|
@ -456,7 +456,7 @@ public class StandardSSLContextService extends AbstractControllerService impleme
|
|||
|
||||
final int numProtocols = supportedProtocols.size();
|
||||
|
||||
// Sort for consistent presentation in configuraiton views
|
||||
// Sort for consistent presentation in configuration views
|
||||
final List<String> supportedProtocolList = new ArrayList<>(supportedProtocols);
|
||||
Collections.sort(supportedProtocolList);
|
||||
|
||||
|
|
|
@ -351,7 +351,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
|
|||
// is cloned for each matching rule. in 'use original' mode, this collection
|
||||
// will contain a single entry that maps a list of multiple rules. this is
|
||||
// because is the original flowfile is used for all matching rules. in this
|
||||
// case the order of the matching rules is perserved in the list
|
||||
// case the order of the matching rules is preserved in the list
|
||||
final Map<FlowFile, List<Rule>> matchedRules = new HashMap<>();
|
||||
|
||||
for (FlowFile flowFile : flowFiles) {
|
||||
|
|
Loading…
Reference in New Issue