NIFI-1157 resolved deprecated nifi-api items and ripple effects

This commit is contained in:
joewitt 2016-07-13 19:36:03 -04:00 committed by Mark Payne
parent 4c9d4655a8
commit 961be21a38
59 changed files with 135 additions and 1235 deletions

View File

@ -17,12 +17,12 @@
package org.apache.nifi.controller;
import java.util.Map;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.reporting.InitializationException;
@ -49,7 +49,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
return identifier;
}
@OnConfigured
@OnConfigurationRestored
public void onConfigurationChange(final ConfigurationContext context) {
this.configContext = context;
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a ControllerService implementation can use to indicate a
* method should be called after all of the properties have been set for the
* Controller Service. Methods using this annotation must take either 0
* arguments or a single argument of type
* {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}.
*
*
* @deprecated This annotation has been replaced by those in the
* {@link org.apache.nifi.annotation.lifecycle} package.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface OnConfigured {
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.flowfile;
import java.util.Map;
import java.util.Set;
/**
* <p>
@ -86,25 +85,6 @@ public interface FlowFile extends Comparable<FlowFile> {
*/
long getQueueDateIndex();
/**
* <p>
* If a FlowFile is derived from multiple "parent" FlowFiles, all of the
* parents' Lineage Identifiers will be in the set.
* </p>
*
* @return a set of identifiers that are unique to this FlowFile's lineage.
* If FlowFile X is derived from FlowFile Y, both FlowFiles will have the
* same value for the Lineage Claim ID.
*
* @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems
* when dealing with FlowFiles with many ancestors. This Collection is
* now capped at 100 lineage identifiers. This method was introduced with the idea of providing
* future performance improvements but due to the high cost of heap consumption will not be used
* in such a manner. As a result, this method will be removed in a future release.
*/
@Deprecated
Set<String> getLineageIdentifiers();
/**
* @return true if flow file is currently penalized; false otherwise;
*/

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation that may be placed on a processor allowing for a description to be
* provided. This description can be provided to a user in logs, UI, etc.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.documentation.CapabilityDescription}
* annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface CapabilityDescription {
String value();
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>
* Annotation that may be placed on a Processor that indicates to the framework
* that the Processor is eligible to be scheduled to run based on the occurrence
* of an "Event" (e.g., when a FlowFile is enqueued in an incoming Connection),
* rather than being triggered periodically.
* </p>
*
* <p>
* This Annotation should not be used in conjunction with
* {@link TriggerSerially} or {@link TriggerWhenEmpty}. If this Annotation is
* used with either of these other Annotations, the Processor will not be
* eligible to be scheduled in Event-Driven mode.
* </p>
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.behavior.EventDriven} annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface EventDriven {
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate a method
* should be called whenever the processor is added to the graph. This method
* will be called once for the entire life of a processor instance.
*
* If any method annotated with this annotation throws, the processor will not
* be added to the graph.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.lifecycle.OnAdded} annotation.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface OnAdded {
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate a method
* should be called whenever the processor is removed from the graph. This
* method will be called once for the entire life of a processor instance unless
* an invocation of this method throws any Throwable.
*
* If any method annotated with this annotation throws, the processor will not
* be removed from the graph.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.lifecycle.OnRemoved} annotation.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface OnRemoved {
}

View File

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate a method
* should be called whenever the processor is scheduled for processing. This
* will be called before any 'onTrigger' calls and will be called once each time
* a processor instance is scheduled to run. Methods using this annotation must
* take either 0 arguments or a single argument of type
* {@link org.apache.nifi.processor.SchedulingContext SchedulingContext}.
*
* If any method annotated with this annotation throws, the processor will not
* be scheduled to run.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.lifecycle.OnScheduled} annotation.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface OnScheduled {
}

View File

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate a method
* should be called whenever the flow is being shutdown. This will be called at
* most once for each processor instance in a process lifetime.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.lifecycle.OnShutdown} annotation.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface OnShutdown {
}

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>
* Marker annotation a processor implementation can use to indicate a method
* should be called whenever the processor is no longer scheduled to run.
* Methods marked with this annotation will be invoked each time the processor
* is stopped and will be invoked only after the last thread has returned from
* the <code>onTrigger</code> method.
* </p>
*
* <p>
* This means that the thread executing in this method will be the only thread
* executing in any part of the Processor. However, since other threads may
* later execute other parts of the code, member variables must still be
* protected appropriately. However, access to multiple variables need not be
* atomic.
* </p>
*
* <p>
* To indicate that a method should be called immediately when a processor is no
* longer scheduled to run, see the {@link OnUnscheduled} annotation.
* </p>
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.lifecycle.OnStopped} annotation.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface OnStopped {
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>
* Marker annotation a processor implementation can use to indicate a method
* should be called whenever the processor is no longer scheduled to run.
* Methods marked with this annotation will be invoked each time the framework
* is notified to stop scheduling the processor. This method is invoked as other
* threads are potentially running. To invoke a method after all threads have
* finished processing, see the {@link OnStopped} annotation.
* </p>
*
* If any method annotated with this annotation throws, the processor will not
* be scheduled to run.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.lifecycle.OnUnscheduled} annotation.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface OnUnscheduled {
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate that its
* operations on flow files can be safely repeated across process sessions. If a
* processor has this annotation and it allows the framework to manage session
* commit and rollback then the framework may elect to cascade a
* <code>ProcessSession</code> given to this processor's onTrigger method to the
* onTrigger method of another processor. It can do this knowing that if
* something fails along a series of processors using this same session that it
* can all be safely rolled back without any ill effects on external services
* which could not be rolled back and thus all the processes could be safely
* repeated (implied idempotent behavior).
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.behavior.SideEffectFree} annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface SideEffectFree {
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a Processor implementation can use to indicate that users
* should be able to supply a Batch Duration for the Processor. If a Processor
* uses this annotation, it is allowing the Framework to batch
* {@link org.apache.nifi.processor.ProcessSession ProcessSession}s' commits, as well as
* allowing the Framework to return the same ProcessSession multiple times from
* subsequent calls to
* {@link org.apache.nifi.processor.ProcessSessionFactory ProcessSessionFactory}.{@link org.apache.nifi.processor.ProcessSessionFactory#createSession() createSession()}.
*
* When this Annotation is used, it is important to note that calls to
* {@link org.apache.nifi.processor.ProcessSession#commit() ProcessSession.commit()} may
* not provide a guarantee that the data has been safely stored in NiFi's
* Content Repository or FlowFile Repository. Therefore, it is not appropriate,
* for instance, to use this annotation if the Processor will call
* ProcessSession.commit() to ensure data is persisted before deleting the data
* from a remote source.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.behavior.SupportsBatching} annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface SupportsBatching {
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation that can be applied to a {@link Processor} in order to associate
* tags with the processor. These tags do not affect the {@link Processor} in
* any way but serve as additional documentation and can be used to sort/filter
* Processors.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.documentation.Tags} annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface Tags {
public String[] value();
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate that the
* processor is not thread safe for concurrent execution of its onTrigger()
* method. By default processors are assumed to be thread safe for concurrent
* execution.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.behavior.TriggerSerially} annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface TriggerSerially {
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate that the
* processor is to be triggered if any of its destinations has available space
* for incoming FlowFiles. By default processors are triggered only when all
* destinations report that they have available space.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable}
* annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface TriggerWhenAnyDestinationAvailable {
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a processor implementation can use to indicate that the
* processor should still be triggered even when it has no data in its work
* queue. By default, processors which have no non-self incoming edges will be
* triggered even if there is no work in its queue. However, processors that
* have non-self incoming edges will only be triggered if they have work in
* their queue or they present this annotation.
*
* @deprecated This Annotation has been replaced by the
* {@link org.apache.nifi.annotation.behavior.TriggerWhenEmpty} annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Deprecated
public @interface TriggerWhenEmpty {
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.provenance;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Processor;
@ -50,15 +49,6 @@ public interface ProvenanceEventBuilder {
*/
ProvenanceEventBuilder setFlowFileEntryDate(long entryDate);
/**
* Sets the Lineage Identifiers. This is a set of all FlowFile UUID's that
* were involved in making this event occur.
*
* @param lineageIdentifiers of the flowfiles in this event
* @return the builder
*/
ProvenanceEventBuilder setLineageIdentifiers(Set<String> lineageIdentifiers);
/**
* Sets the Content Claim that the FlowFile was previously associated with
* before this event occurred.

View File

@ -18,7 +18,6 @@ package org.apache.nifi.provenance;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Describes an event that happened to a FlowFile.
@ -48,19 +47,6 @@ public interface ProvenanceEventRecord {
*/
long getLineageStartDate();
/**
* @return the set of all lineage identifiers that are associated with the
* FlowFile for which this Event was created
*
* @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems
* when querying Provenance Events about FlowFiles with many ancestors. This Collection is
* now capped at 100 lineage identifiers. This method was introduced with the idea of providing
* future performance improvements but due to the high cost of heap consumption will not be used
* in such a manner. As a result, this method will be removed in a future release.
*/
@Deprecated
Set<String> getLineageIdentifiers();
/**
* @return the size of the FlowFile to which this Event is associated
*/

View File

@ -18,13 +18,6 @@ package org.apache.nifi.provenance.lineage;
public interface LineageNode {
/**
* @return the identifier of the Clustered NiFi Node that generated the
* event
*/
@Deprecated
String getClusterNodeIdentifier();
/**
* @return the type of the LineageNode
*/

View File

@ -16,8 +16,8 @@
*/
package org.apache.nifi.reporting;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.annotation.OnConfigured;
/**
* Defines a task that is responsible for reporting status information to
@ -47,7 +47,7 @@ import org.apache.nifi.controller.annotation.OnConfigured;
*
* <p>
* ReportingTasks may choose to annotate a method with the
* {@link OnConfigured @OnConfigured} annotation. If this is done, that method
* {@link OnConfigurationRestored @OnConfigurationRestored} annotation. If this is done, that method
* will be invoked after all properties have been set for the ReportingTask and
* before it is scheduled to run. If the method throws an Exception, the
* ReportingTask will be Administratively yielded and will not run for the

View File

@ -59,11 +59,6 @@ public class PlaceholderProvenanceEvent implements ProvenanceEventRecord {
return 0;
}
@Override
public Set<String> getLineageIdentifiers() {
return Collections.emptySet();
}
@Override
public long getFileSize() {
return -1L;

View File

@ -20,10 +20,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -31,7 +29,6 @@ import org.apache.nifi.processor.Relationship;
/**
* Holder for provenance relevant information
* <p/>
*/
public final class StandardProvenanceEventRecord implements ProvenanceEventRecord {
@ -39,7 +36,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
private final long entryDate;
private final ProvenanceEventType eventType;
private final long lineageStartDate;
private final Set<String> lineageIdentifiers;
private final String componentId;
private final String componentType;
private final String transitUri;
@ -91,7 +87,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
this.storageFilename = builder.storageFilename;
this.eventDuration = builder.eventDuration;
this.lineageStartDate = builder.lineageStartDate;
this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers);
previousClaimSection = builder.previousClaimSection;
previousClaimContainer = builder.previousClaimContainer;
@ -109,7 +104,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
sourceQueueIdentifier = builder.sourceQueueIdentifier;
}
public String getStorageFilename() {
@ -134,11 +129,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
return eventTime;
}
@Override
public Set<String> getLineageIdentifiers() {
return lineageIdentifiers;
}
@Override
public long getLineageStartDate() {
return lineageStartDate;
@ -414,7 +404,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
private long eventTime = System.currentTimeMillis();
private long entryDate;
private long lineageStartDate;
private Set<String> lineageIdentifiers = new HashSet<>();
private ProvenanceEventType eventType = null;
private String componentId = null;
private String componentType = null;
@ -453,7 +442,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
eventTime = event.getEventTime();
entryDate = event.getFlowFileEntryDate();
lineageStartDate = event.getLineageStartDate();
lineageIdentifiers = event.getLineageIdentifiers();
eventType = event.getEventType();
componentId = event.getComponentId();
componentType = event.getComponentType();
@ -498,12 +486,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
return this;
}
@Override
public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) {
this.lineageIdentifiers = lineageIdentifiers;
return this;
}
@Override
public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) {
this.previousAttributes = previousAttributes;
@ -541,11 +523,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
return this;
}
public Builder addLineageIdentifier(final String lineageIdentifier) {
this.lineageIdentifiers.add(lineageIdentifier);
return this;
}
@Override
public Builder setEventType(ProvenanceEventType eventType) {
this.eventType = eventType;
@ -661,7 +638,6 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
@Override
public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) {
setFlowFileEntryDate(flowFile.getEntryDate());
setLineageIdentifiers(flowFile.getLineageIdentifiers());
setLineageStartDate(flowFile.getLineageStartDate());
setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes());
uuid = flowFile.getAttribute(CoreAttributes.UUID.key());

View File

@ -36,17 +36,6 @@ public class EventNode implements ProvenanceEventLineageNode {
return String.valueOf(getEventIdentifier());
}
@Deprecated
@Override
public String getClusterNodeIdentifier() {
return clusterNodeIdentifier;
}
@Deprecated
public void setClusterNodeIdentifier(final String nodeIdentifier) {
this.clusterNodeIdentifier = nodeIdentifier;
}
@Override
public LineageNodeType getNodeType() {
return LineageNodeType.PROVENANCE_EVENT_NODE;

View File

@ -22,7 +22,6 @@ public class FlowFileNode implements LineageNode {
private final String flowFileUuid;
private final long creationTime;
private String clusterNodeIdentifier;
public FlowFileNode(final String flowFileUuid, final long flowFileCreationTime) {
this.flowFileUuid = requireNonNull(flowFileUuid);
@ -39,12 +38,6 @@ public class FlowFileNode implements LineageNode {
return creationTime;
}
@Deprecated
@Override
public String getClusterNodeIdentifier() {
return clusterNodeIdentifier;
}
@Override
public LineageNodeType getNodeType() {
return LineageNodeType.FLOWFILE_NODE;

View File

@ -341,7 +341,6 @@ public class TestQuery {
Mockito.when(mockFlowFile.getId()).thenReturn(1L);
Mockito.when(mockFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis());
Mockito.when(mockFlowFile.getSize()).thenReturn(1L);
Mockito.when(mockFlowFile.getLineageIdentifiers()).thenReturn(new HashSet<String>());
Mockito.when(mockFlowFile.getLineageStartDate()).thenReturn(System.currentTimeMillis());
return Query.evaluateExpressions(queryString, mockFlowFile);
}

View File

@ -45,7 +45,6 @@ public class MockFlowFile implements FlowFileRecord {
private final long id;
private final long entryDate;
private final Set<String> lineageIdentifiers = new HashSet<>();
private final long creationTime;
private boolean penalized = false;
@ -60,7 +59,6 @@ public class MockFlowFile implements FlowFileRecord {
final String uuid = UUID.randomUUID().toString();
attributes.put(CoreAttributes.UUID.key(), uuid);
lineageIdentifiers.add(uuid);
}
public MockFlowFile(final long id, final FlowFile toCopy) {
@ -69,8 +67,6 @@ public class MockFlowFile implements FlowFileRecord {
final byte[] dataToCopy = ((MockFlowFile) toCopy).data;
this.data = new byte[dataToCopy.length];
System.arraycopy(dataToCopy, 0, this.data, 0, dataToCopy.length);
lineageIdentifiers.addAll(toCopy.getLineageIdentifiers());
}
void setPenalized() {
@ -81,11 +77,6 @@ public class MockFlowFile implements FlowFileRecord {
return creationTime;
}
@Override
public Set<String> getLineageIdentifiers() {
return lineageIdentifiers;
}
@Override
public long getLineageStartDate() {
return entryDate;

View File

@ -21,9 +21,7 @@ import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@ -87,18 +85,9 @@ public class StandardProcessorTestRunner implements TestRunner {
private int numThreads = 1;
private final AtomicInteger invocations = new AtomicInteger(0);
private static final Set<Class<? extends Annotation>> deprecatedTypeAnnotations = new HashSet<>();
private static final Set<Class<? extends Annotation>> deprecatedMethodAnnotations = new HashSet<>();
private final Map<String, MockComponentLog> controllerServiceLoggers = new HashMap<>();
private final MockComponentLog logger;
static {
// do this in a separate method, just so that we can add a @SuppressWarnings annotation
// because we want to indicate explicitly that we know that we are using deprecated
// classes here.
populateDeprecatedMethods();
}
StandardProcessorTestRunner(final Processor processor) {
this.processor = processor;
this.idGenerator = new AtomicLong(0L);
@ -108,8 +97,6 @@ public class StandardProcessorTestRunner implements TestRunner {
this.processorStateManager = new MockStateManager(processor);
this.context = new MockProcessContext(processor, processorStateManager);
detectDeprecatedAnnotations(processor);
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
processor.initialize(mockInitContext);
logger = mockInitContext.getLogger();
@ -125,42 +112,6 @@ public class StandardProcessorTestRunner implements TestRunner {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
}
@SuppressWarnings("deprecation")
private static void populateDeprecatedMethods() {
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.CapabilityDescription.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.EventDriven.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.SideEffectFree.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.SupportsBatching.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.Tags.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerSerially.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnRemoved.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnAdded.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnScheduled.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnShutdown.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnStopped.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnUnscheduled.class);
}
private static void detectDeprecatedAnnotations(final Processor processor) {
for (final Class<? extends Annotation> annotationClass : deprecatedTypeAnnotations) {
if (processor.getClass().isAnnotationPresent(annotationClass)) {
Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName());
}
}
for (final Class<? extends Annotation> annotationClass : deprecatedMethodAnnotations) {
for (final Method method : processor.getClass().getMethods()) {
if (method.isAnnotationPresent(annotationClass)) {
Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName() + " for method " + method);
}
}
}
}
@Override
public void setValidateExpressionUsage(final boolean validate) {
context.setValidateExpressionUsage(validate);

View File

@ -29,7 +29,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.junit.Ignore;
import org.junit.Test;
public class TestStandardProcessorTestRunner {
@ -111,39 +110,6 @@ public class TestStandardProcessorTestRunner {
runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY);
}
@Test(expected = AssertionError.class)
@Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods")
public void testFailOnDeprecatedTypeAnnotation() {
new StandardProcessorTestRunner(new DeprecatedAnnotation());
}
@Test
@Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods")
public void testDoesNotFailOnNonDeprecatedTypeAnnotation() {
new StandardProcessorTestRunner(new NewAnnotation());
}
@Test(expected = AssertionError.class)
@Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods")
public void testFailOnDeprecatedMethodAnnotation() {
new StandardProcessorTestRunner(new DeprecatedMethodAnnotation());
}
@Test
@Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods")
public void testDoesNotFailOnNonDeprecatedMethodAnnotation() {
new StandardProcessorTestRunner(new NewMethodAnnotation());
}
@SuppressWarnings("deprecation")
@org.apache.nifi.processor.annotation.Tags({"deprecated"})
private static class DeprecatedAnnotation extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
@org.apache.nifi.annotation.documentation.Tags({"deprecated"})
private static class NewAnnotation extends AbstractProcessor {
@ -164,19 +130,6 @@ public class TestStandardProcessorTestRunner {
}
}
private static class DeprecatedMethodAnnotation extends AbstractProcessor {
@SuppressWarnings("deprecation")
@org.apache.nifi.processor.annotation.OnScheduled
public void dummy() {
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
private static class ProcessorWithOnStop extends AbstractProcessor {
private int callsWithContext = 0;

View File

@ -29,7 +29,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.ENTRY_DATE_HEADER;
import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.ID_HEADER;
import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LAST_QUEUE_DATE_HEADER;
import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LINEAGE_IDENTIFIERS_HEADER;
import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LINEAGE_START_DATE_HEADER;
import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.SIZE_HEADER;
@ -65,11 +64,6 @@ public class FlowFileEvent implements Event {
headers.put(ENTRY_DATE_HEADER, Long.toString(flowFile.getEntryDate()));
headers.put(ID_HEADER, Long.toString(flowFile.getId()));
headers.put(LAST_QUEUE_DATE_HEADER, Long.toString(flowFile.getLastQueueDate()));
int i = 0;
for (String lineageIdentifier : flowFile.getLineageIdentifiers()) {
headers.put(LINEAGE_IDENTIFIERS_HEADER + "." + i, lineageIdentifier);
i++;
}
headers.put(LINEAGE_START_DATE_HEADER, Long.toString(flowFile.getLineageStartDate()));
headers.put(SIZE_HEADER, Long.toString(flowFile.getSize()));
headersLoaded = true;

View File

@ -28,9 +28,6 @@ public class FlowFileEventConstants {
// FlowFile#getLastQueueDate();
public static final String LAST_QUEUE_DATE_HEADER = "nifi.last.queue.date";
// FlowFile#getLineageIdentifiers();
public static final String LINEAGE_IDENTIFIERS_HEADER = "nifi.lineage.identifiers";
// FlowFile#getLineageStartDate();
public static final String LINEAGE_START_DATE_HEADER = "nifi.lineage.start.date";

View File

@ -51,7 +51,7 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia
final ComponentLog logger = new MockComponentLogger();
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, controllerService, logger, context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, controllerService, logger, context);
}
}
}

View File

@ -49,7 +49,7 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer {
final ComponentLog logger = new MockComponentLogger();
final MockProcessContext context = new MockProcessContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, processor, logger, context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, processor, logger, context);
}
}
}

View File

@ -48,7 +48,7 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockComponentLogger(), context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, reportingTask, new MockComponentLogger(), context);
}
}
}

View File

@ -19,121 +19,115 @@ package org.apache.nifi.documentation.util;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.logging.ComponentLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is a copy of org.apache.nifi.util.ReflectionUtils. Ultimately the documentation generation
* component should be moved to a place where it can depend on this directly instead of copying it in.
* This class is a copy of org.apache.nifi.util.ReflectionUtils. Ultimately the
* documentation generation component should be moved to a place where it can
* depend on this directly instead of copying it in.
*
*
*/
public class ReflectionUtils {
private final static Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
/**
* Invokes all methods on the given instance that have been annotated with the given preferredAnnotation and if no such method exists will invoke all methods on the given instance that have been
* annotated with the given alternateAnnotation, if any exists. If the signature of the method that is defined in <code>instance</code> uses 1 or more parameters, those parameters must be
* specified by the <code>args</code> parameter. However, if more arguments are supplied by the <code>args</code> parameter than needed, the extra arguments will be ignored.
* Invokes all methods on the given instance that have been annotated with
* the given annotation. If the signature of the method that is defined in
* <code>instance</code> uses 1 or more parameters, those parameters must be
* specified by the <code>args</code> parameter. However, if more arguments
* are supplied by the <code>args</code> parameter than needed, the extra
* arguments will be ignored.
*
* @param preferredAnnotation preferred
* @param alternateAnnotation alternate
* @param annotation annotation
* @param instance instance
* @param logger the ComponentLog to use for logging any errors. If null, will use own logger, but that will not generate bulletins or easily tie to the Processor's log messages.
* @param logger the ComponentLog to use for logging any errors. If null,
* will use own logger, but that will not generate bulletins or easily tie
* to the Processor's log messages.
* @param args args
* @return <code>true</code> if all appropriate methods were invoked and returned without throwing an Exception, <code>false</code> if one of the methods threw an Exception or could not be
* invoked; if <code>false</code> is returned, an error will have been logged.
* @return <code>true</code> if all appropriate methods were invoked and
* returned without throwing an Exception, <code>false</code> if one of the
* methods threw an Exception or could not be invoked; if <code>false</code>
* is returned, an error will have been logged.
*/
public static boolean quietlyInvokeMethodsWithAnnotations(
final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final ComponentLog logger, final Object... args) {
final List<Class<? extends Annotation>> annotationClasses = new ArrayList<>(alternateAnnotation == null ? 1 : 2);
annotationClasses.add(preferredAnnotation);
if (alternateAnnotation != null) {
annotationClasses.add(alternateAnnotation);
}
public static boolean quietlyInvokeMethodsWithAnnotation(
final Class<? extends Annotation> annotation, final Object instance, final ComponentLog logger, final Object... args) {
boolean annotationFound = false;
for (final Class<? extends Annotation> annotationClass : annotationClasses) {
if (annotationFound) {
break;
}
for (final Method method : instance.getClass().getMethods()) {
if (method.isAnnotationPresent(annotation)) {
for (final Method method : instance.getClass().getMethods()) {
if (method.isAnnotationPresent(annotationClass)) {
annotationFound = true;
final boolean isAccessible = method.isAccessible();
method.setAccessible(true);
final boolean isAccessible = method.isAccessible();
method.setAccessible(true);
try {
final Class<?>[] argumentTypes = method.getParameterTypes();
if (argumentTypes.length > args.length) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
} else {
logger.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
}
return false;
}
for (int i = 0; i < argumentTypes.length; i++) {
final Class<?> argType = argumentTypes[i];
if (!argType.isAssignableFrom(args[i].getClass())) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
} else {
logger.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
}
return false;
}
}
try {
final Class<?>[] argumentTypes = method.getParameterTypes();
if (argumentTypes.length > args.length) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
} else {
logger.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
if (argumentTypes.length == args.length) {
method.invoke(instance, args);
} else {
final Object[] argsToPass = new Object[argumentTypes.length];
for (int i = 0; i < argsToPass.length; i++) {
argsToPass[i] = args[i];
}
return false;
method.invoke(instance, argsToPass);
}
} catch (final InvocationTargetException ite) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
LOG.error("", ite.getCause());
} else {
logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
}
} catch (final IllegalAccessException | IllegalArgumentException t) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
LOG.error("", t);
} else {
logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
}
for (int i = 0; i < argumentTypes.length; i++) {
final Class<?> argType = argumentTypes[i];
if (!argType.isAssignableFrom(args[i].getClass())) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
} else {
logger.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
}
return false;
}
}
try {
if (argumentTypes.length == args.length) {
method.invoke(instance, args);
} else {
final Object[] argsToPass = new Object[argumentTypes.length];
for (int i = 0; i < argsToPass.length; i++) {
argsToPass[i] = args[i];
}
method.invoke(instance, argsToPass);
}
} catch (final InvocationTargetException ite) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
LOG.error("", ite.getCause());
} else {
logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
}
} catch (final IllegalAccessException | IllegalArgumentException t) {
if (logger == null) {
LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
LOG.error("", t);
} else {
logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
}
return false;
}
} finally {
if (!isAccessible) {
method.setAccessible(false);
}
return false;
}
} finally {
if (!isAccessible) {
method.setAccessible(false);
}
}
}
}
return true;
}
}

View File

@ -33,10 +33,8 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
@ -74,7 +72,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
public static final int SWAP_ENCODING_VERSION = 9;
public static final int SWAP_ENCODING_VERSION = 10;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
@ -320,13 +318,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
for (final FlowFileRecord flowFile : toSwap) {
out.writeLong(flowFile.getId());
out.writeLong(flowFile.getEntryDate());
final Set<String> lineageIdentifiers = flowFile.getLineageIdentifiers();
out.writeInt(lineageIdentifiers.size());
for (final String lineageId : lineageIdentifiers) {
out.writeUTF(lineageId);
}
out.writeLong(flowFile.getLineageStartDate());
out.writeLong(flowFile.getLineageStartIndex());
out.writeLong(flowFile.getLastQueueDate());
@ -443,12 +434,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
if (serializationVersion > 1) {
// Lineage information was added in version 2
final int numLineageIdentifiers = in.readInt();
final Set<String> lineageIdentifiers = new HashSet<>(numLineageIdentifiers);
for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
lineageIdentifiers.add(in.readUTF());
if(serializationVersion < 10){
final int numLineageIdentifiers = in.readInt();
for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
in.readUTF(); //skip each identifier
}
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
// version 9 adds in a 'lineage start index'
final long lineageStartDate = in.readLong();

View File

@ -1019,7 +1019,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotations(OnAdded.class, org.apache.nifi.processor.annotation.OnAdded.class, processor);
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
} catch (final Exception e) {
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
@ -3662,11 +3662,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String parentUUID = event.getFlowFileUuid();
// Create the FlowFile Record
final Set<String> lineageIdentifiers = new HashSet<>();
lineageIdentifiers.addAll(event.getLineageIdentifiers());
lineageIdentifiers.add(parentUUID);
final String newFlowFileUUID = UUID.randomUUID().toString();
// We need to create a new FlowFile by populating it with information from the
@ -3685,7 +3680,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself
.entryDate(System.currentTimeMillis())
.id(flowFileRepository.getNextFlowFileSequence())
.lineageIdentifiers(lineageIdentifiers)
.lineageStart(event.getLineageStartDate(), 0L)
.size(contentSize.longValue())
// Create a new UUID and add attributes indicating that this is a replay

View File

@ -151,7 +151,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
identifier = new AtomicReference<>(uuid);
destinations = new HashMap<>();
connections = new HashMap<>();
incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>());
incomingConnectionsRef = new AtomicReference<>(new ArrayList<>());
lossTolerant = new AtomicBoolean(false);
final Set<Relationship> emptySetOfRelationships = new HashSet<>();
undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships);
@ -169,20 +169,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
final Class<?> procClass = processor.getClass();
triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class)
|| procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class)
|| procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class);
batchSupported = procClass.isAnnotationPresent(SupportsBatching.class)
|| procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class);
triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class)
|| procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class);
triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class)
|| procClass.isAnnotationPresent(
org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
eventDrivenSupported = (procClass.isAnnotationPresent(EventDriven.class)
|| procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class))
&& !triggeredSerially && !triggerWhenEmpty;
triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class);
sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class);
batchSupported = procClass.isAnnotationPresent(SupportsBatching.class);
triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class);
triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty;
final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class);
if (inputRequirementPresent) {
@ -371,20 +363,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
* @return the value of the processor's {@link CapabilityDescription}
* annotation, if one exists, else <code>null</code>.
*/
@SuppressWarnings("deprecation")
public String getProcessorDescription() {
final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class);
String description = null;
if (capDesc != null) {
description = capDesc.value();
} else {
final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc = processor.getClass()
.getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class);
if (deprecatedCapDesc != null) {
description = deprecatedCapDesc.value();
}
}
return description;
}
@ -1254,8 +1238,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public Void call() throws Exception {
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class,
org.apache.nifi.processor.annotation.OnScheduled.class, processor, processContext);
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
return null;
}
}

View File

@ -30,16 +30,12 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
@ -138,30 +134,11 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
onConfigured();
}
@Override
public boolean removeProperty(String name) {
final boolean removed = super.removeProperty(name);
if (removed) {
onConfigured();
}
return removed;
}
@SuppressWarnings("deprecation")
private void onConfigured() {
// We need to invoke any method annotation with the OnConfigured annotation in order to
// maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod());
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
}
return super.removeProperty(name);
}
public boolean isDisabled() {

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.controller.repository;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -43,13 +42,11 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
*
*/
public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private static final int MAX_LINEAGE_IDENTIFIERS = 100;
private final long id;
private final long entryDate;
private final long lineageStartDate;
private final long lineageStartIndex;
private final Set<String> lineageIdentifiers;
private final long size;
private final long penaltyExpirationMs;
private final Map<String, String> attributes;
@ -64,7 +61,6 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
this.entryDate = builder.bEntryDate;
this.lineageStartDate = builder.bLineageStartDate;
this.lineageStartIndex = builder.bLineageStartIndex;
this.lineageIdentifiers = builder.bLineageIdentifiers;
this.penaltyExpirationMs = builder.bPenaltyExpirationMs;
this.size = builder.bSize;
this.claim = builder.bClaim;
@ -83,11 +79,6 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return entryDate;
}
@Override
public Set<String> getLineageIdentifiers() {
return Collections.unmodifiableSet(lineageIdentifiers);
}
@Override
public long getLineageStartDate() {
return lineageStartDate;
@ -196,25 +187,6 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this;
}
public Builder lineageIdentifiers(final Collection<String> lineageIdentifiers) {
if (null != lineageIdentifiers) {
bLineageIdentifiers.clear();
if (lineageIdentifiers.size() > MAX_LINEAGE_IDENTIFIERS) {
int i = 0;
for (final String id : lineageIdentifiers) {
bLineageIdentifiers.add(id);
if (i++ >= MAX_LINEAGE_IDENTIFIERS) {
break;
}
}
} else {
bLineageIdentifiers.addAll(lineageIdentifiers);
}
}
return this;
}
public Builder entryDate(final long epochMs) {
bEntryDate = epochMs;
return this;
@ -330,7 +302,6 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
bLineageStartDate = specFlowFile.getLineageStartDate();
bLineageStartIndex = specFlowFile.getLineageStartIndex();
bLineageIdentifiers.clear();
bLineageIdentifiers.addAll(specFlowFile.getLineageIdentifiers());
bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
bSize = specFlowFile.getSize();
bAttributes.putAll(specFlowFile.getAttributes());

View File

@ -1414,7 +1414,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
eventBuilder.setEventType(ProvenanceEventType.FORK);
eventBuilder.setFlowFileEntryDate(parent.getEntryDate());
eventBuilder.setLineageIdentifiers(parent.getLineageIdentifiers());
eventBuilder.setLineageStartDate(parent.getLineageStartDate());
eventBuilder.setFlowFileUUID(parent.getAttribute(CoreAttributes.UUID.key()));
@ -2618,9 +2617,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newAttributes.put(key, value);
}
final Set<String> lineageIdentifiers = new HashSet<>(parent.getLineageIdentifiers());
lineageIdentifiers.add(parent.getAttribute(CoreAttributes.UUID.key()));
fFileBuilder.lineageIdentifiers(lineageIdentifiers);
fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
fFileBuilder.addAttributes(newAttributes);
@ -2646,8 +2642,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
long lineageStartDate = 0L;
final Set<String> lineageIdentifiers = new HashSet<>();
for (final FlowFile parent : parents) {
lineageIdentifiers.addAll(parent.getLineageIdentifiers());
lineageIdentifiers.add(parent.getAttribute(CoreAttributes.UUID.key()));
final long parentLineageStartDate = parent.getLineageStartDate();
if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) {
@ -2669,7 +2663,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(newAttributes)
.lineageIdentifiers(lineageIdentifiers)
.lineageStart(lineageStartDate, lineageStartIndex)
.build();

View File

@ -388,7 +388,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
private static class WriteAheadRecordSerde implements SerDe<RepositoryRecord> {
private static final int CURRENT_ENCODING_VERSION = 8;
private static final int CURRENT_ENCODING_VERSION = 9;
public static final byte ACTION_CREATE = 0;
public static final byte ACTION_UPDATE = 1;
@ -467,13 +467,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
out.writeLong(getRecordIdentifier(record));
out.writeLong(flowFile.getEntryDate());
final Set<String> lineageIdentifiers = flowFile.getLineageIdentifiers();
out.writeInt(lineageIdentifiers.size());
for (final String lineageId : lineageIdentifiers) {
out.writeUTF(lineageId);
}
out.writeLong(flowFile.getLineageStartDate());
out.writeLong(flowFile.getLineageStartIndex());
@ -549,13 +542,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
if (version > 1) {
// read the lineage identifiers and lineage start date, which were added in version 2.
final int numLineageIds = in.readInt();
final Set<String> lineageIdentifiers = new HashSet<>(numLineageIds);
for (int i = 0; i < numLineageIds; i++) {
lineageIdentifiers.add(in.readUTF());
if(version < 9){
final int numLineageIds = in.readInt();
for (int i = 0; i < numLineageIds; i++) {
in.readUTF(); //skip identifiers
}
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
final long lineageStartDate = in.readLong();
final long lineageStartIndex;
if (version > 7) {
@ -661,12 +653,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
if (version > 1) {
// read the lineage identifiers and lineage start date, which were added in version 2.
final int numLineageIds = in.readInt();
final Set<String> lineageIdentifiers = new HashSet<>(numLineageIds);
for (int i = 0; i < numLineageIds; i++) {
lineageIdentifiers.add(in.readUTF());
if(version < 9) {
final int numLineageIds = in.readInt();
for (int i = 0; i < numLineageIds; i++) {
in.readUTF(); //skip identifiers
}
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
final long lineageStartDate = in.readLong();
final long lineageStartIndex;

View File

@ -303,7 +303,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
} finally {
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, worker, processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext);
}
}

View File

@ -45,11 +45,8 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
@ -203,27 +200,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
onConfigured();
}
@Override
public boolean removeProperty(String name) {
final boolean removed = super.removeProperty(name);
if (removed) {
onConfigured();
}
return removed;
}
@SuppressWarnings("deprecation")
private void onConfigured() {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider, null);
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
}
return super.removeProperty(name);
}
@Override

View File

@ -95,7 +95,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
} finally {
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, connectable, processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
}
}

View File

@ -52,9 +52,7 @@ public class ReportingTaskWrapper implements Runnable {
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(
OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class,
taskNode.getReportingTask(), taskNode.getConfigurationContext());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
}
}
} finally {

View File

@ -338,7 +338,7 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final ProcessorNode node : procGroup.getProcessors()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()));
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, node.getProcessor(), processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
}
}
@ -704,7 +704,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()));
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor.getProcessor(), processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e);
}

View File

@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -110,7 +109,6 @@ public class TestFileSystemSwapManager {
assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
assertEquals(pre.getEntryDate(), post.getEntryDate());
assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
assertEquals(pre.getLineageIdentifiers(), post.getLineageIdentifiers());
assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
}
@ -198,11 +196,6 @@ public class TestFileSystemSwapManager {
return lastQueueDate;
}
@Override
public Set<String> getLineageIdentifiers() {
return Collections.emptySet();
}
@Override
public boolean isPenalized() {
return false;

View File

@ -524,7 +524,7 @@ public class TestStandardFlowFileQueue {
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
swapOutCalledCount++;
final String location = UUID.randomUUID().toString();
swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles));
swappedOut.put(location, new ArrayList<>(flowFiles));
return location;
}
@ -560,7 +560,7 @@ public class TestStandardFlowFileQueue {
@Override
public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
return new ArrayList<String>(swappedOut.keySet());
return new ArrayList<>(swappedOut.keySet());
}
@Override
@ -610,7 +610,7 @@ public class TestStandardFlowFileQueue {
}
public TestFlowFile(final long size) {
this(new HashMap<String, String>(), size);
this(new HashMap<>(), size);
}
public TestFlowFile(final Map<String, String> attributes, final long size) {
@ -647,11 +647,6 @@ public class TestStandardFlowFileQueue {
return null;
}
@Override
public Set<String> getLineageIdentifiers() {
return Collections.emptySet();
}
@Override
public boolean isPenalized() {
return false;

View File

@ -118,7 +118,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private static final String FILE_EXTENSION = ".prov";
private static final String TEMP_FILE_SUFFIX = ".prov.part";
private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur
public static final int SERIALIZATION_VERSION = 8;
public static final int SERIALIZATION_VERSION = 9;
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");

View File

@ -23,9 +23,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
@ -92,8 +90,8 @@ public class StandardRecordReader implements RecordReader {
final int serializationVersion = dis.readInt();
headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
if (serializationVersion < 1 || serializationVersion > 8) {
throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8");
if (serializationVersion < 1 || serializationVersion > 9) {
throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-9");
}
this.serializationVersion = serializationVersion;
@ -252,7 +250,6 @@ public class StandardRecordReader implements RecordReader {
final Map<String, String> attrs = readAttributes(dis, false);
builder.setFlowFileEntryDate(System.currentTimeMillis());
builder.setLineageIdentifiers(Collections.<String>emptySet());
builder.setLineageStartDate(-1L);
builder.setAttributes(Collections.<String, String>emptyMap(), attrs);
builder.setCurrentContentClaim(null, null, null, null, fileSize);
@ -288,10 +285,11 @@ public class StandardRecordReader implements RecordReader {
final Long flowFileEntryDate = dis.readLong();
builder.setEventDuration(dis.readLong());
final Set<String> lineageIdentifiers = new HashSet<>();
final int numLineageIdentifiers = dis.readInt();
for (int i = 0; i < numLineageIdentifiers; i++) {
lineageIdentifiers.add(readUUID(dis));
if (serializationVersion < 9){
final int numLineageIdentifiers = dis.readInt();
for (int i = 0; i < numLineageIdentifiers; i++) {
readUUID(dis); //skip identifiers
}
}
final long lineageStartDate = dis.readLong();
@ -358,7 +356,6 @@ public class StandardRecordReader implements RecordReader {
}
builder.setFlowFileEntryDate(flowFileEntryDate);
builder.setLineageIdentifiers(lineageIdentifiers);
builder.setLineageStartDate(lineageStartDate);
builder.setStorageLocation(filename, startOffset);

View File

@ -165,8 +165,6 @@ public class StandardRecordWriter implements RecordWriter {
out.writeLong(record.getEventTime());
out.writeLong(record.getFlowFileEntryDate());
out.writeLong(record.getEventDuration());
writeUUIDs(out, record.getLineageIdentifiers());
out.writeLong(record.getLineageStartDate());
writeNullableString(out, record.getComponentId());

View File

@ -93,10 +93,6 @@ public class IndexingAction {
doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
}
for (final String lineageIdentifier : record.getLineageIdentifiers()) {
addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
}
// If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
for (final String uuid : record.getChildUuids()) {

View File

@ -47,7 +47,7 @@ public class LineageQuery {
private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory,
final String lineageIdentifier, final Collection<String> flowFileUuids, final int maxAttributeChars) throws IOException {
final String lineageIdentifier, final Collection<String> flowFileUuids, final int maxAttributeChars) throws IOException {
if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
}
@ -73,25 +73,8 @@ public class LineageQuery {
flowFileIdQuery.setMinimumNumberShouldMatch(1);
}
BooleanQuery query;
if (lineageIdentifier == null) {
query = flowFileIdQuery;
} else {
final BooleanQuery lineageIdQuery = new BooleanQuery();
lineageIdQuery.add(new TermQuery(new Term(SearchableFields.LineageIdentifier.getSearchableFieldName(), lineageIdentifier)), Occur.MUST);
if (flowFileIdQuery == null) {
query = lineageIdQuery;
} else {
query = new BooleanQuery();
query.add(flowFileIdQuery, Occur.SHOULD);
query.add(lineageIdQuery, Occur.SHOULD);
query.setMinimumNumberShouldMatch(1);
}
}
final long searchStart = System.nanoTime();
final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
final TopDocs uuidQueryTopDocs = searcher.search(flowFileIdQuery, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime();
// Always authorized. We do this because we need to pull back the event, regardless of whether or not
@ -100,7 +83,7 @@ public class LineageQuery {
final DocsReader docsReader = new DocsReader();
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, authCheck, searcher.getIndexReader(), repo.getAllLogFiles(),
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
final long readDocsEnd = System.nanoTime();
logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
@ -113,7 +96,7 @@ public class LineageQuery {
} catch (final FileNotFoundException fnfe) {
// nothing has been indexed yet, or the data has already aged off
logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe);
if ( logger.isDebugEnabled() ) {
if (logger.isDebugEnabled()) {
logger.warn("", fnfe);
}

View File

@ -17,9 +17,7 @@
package org.apache.nifi.provenance;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
@ -38,11 +36,6 @@ public class TestUtil {
return System.currentTimeMillis();
}
@Override
public Set<String> getLineageIdentifiers() {
return new HashSet<String>();
}
@Override
public long getLineageStartDate() {
return System.currentTimeMillis();

View File

@ -51,7 +51,6 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@ -709,11 +708,6 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
return record.getLineageStartDate();
}
@Override
public Set<String> getLineageIdentifiers() {
return record.getLineageIdentifiers();
}
@Override
public long getFileSize() {
return record.getFileSize();

View File

@ -27,10 +27,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
@ -133,11 +131,6 @@ public class TestVolatileProvenanceRepository {
return System.currentTimeMillis();
}
@Override
public Set<String> getLineageIdentifiers() {
return new HashSet<String>();
}
@Override
public long getLineageStartDate() {
return System.currentTimeMillis();

View File

@ -249,19 +249,9 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
addField(builder, "eventOrdinal", event.getEventId());
addField(builder, "eventType", event.getEventType().name());
addField(builder, "timestampMillis", event.getEventTime());
addField(builder, "timestamp", df.format(event.getEventTime()));
addField(builder, "durationMillis", event.getEventDuration());
addField(builder, "lineageStart", event.getLineageStartDate());
final Set<String> lineageIdentifiers = new HashSet<>();
if (event.getLineageIdentifiers() != null) {
lineageIdentifiers.addAll(event.getLineageIdentifiers());
}
lineageIdentifiers.add(event.getFlowFileUuid());
addField(builder, factory, "lineageIdentifiers", lineageIdentifiers);
addField(builder, "details", event.getDetails());
addField(builder, "componentId", event.getComponentId());
addField(builder, "componentType", event.getComponentType());

View File

@ -23,10 +23,8 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@ -69,10 +67,6 @@ public class TestSiteToSiteProvenanceReportingTask {
final Map<String, String> prevAttrs = new HashMap<>();
attributes.put("filename", "1234.xyz");
final Set<String> lineageIdentifiers = new HashSet<>();
lineageIdentifiers.add("123");
lineageIdentifiers.add("321");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
@ -82,7 +76,6 @@ public class TestSiteToSiteProvenanceReportingTask {
builder.setAttributes(prevAttrs, attributes);
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
builder.setLineageIdentifiers(lineageIdentifiers);
final ProvenanceEventRecord event = builder.build();
final List<byte[]> dataSent = new ArrayList<>();