mirror of https://github.com/apache/nifi.git
NIFI-13526 Removed unused variables and localized nifi-socket-utils
- Remove nifi-socket-utils and moved remaining classes to referencing framework components This closes #9059 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
58b2af9a6e
commit
f92d8f72a8
|
@ -111,7 +111,6 @@ public class ConfigSchemaV2 extends BaseSchema implements ConvertableSchema<Conf
|
|||
List<String> allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList());
|
||||
List<String> allFunnelIds = allProcessGroups.stream().flatMap(p -> p.getFunnels().stream()).map(FunnelSchema::getId).collect(Collectors.toList());
|
||||
List<String> allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList());
|
||||
List<String> allRemoteProcessGroupNames = allRemoteProcessGroups.stream().map(RemoteProcessGroupSchemaV2::getName).collect(Collectors.toList());
|
||||
List<String> allRemoteInputPortIds = allRemoteProcessGroups.stream().filter(r -> r.getInputPorts() != null)
|
||||
.flatMap(r -> r.getInputPorts().stream()).map(RemotePortSchema::getId).collect(Collectors.toList());
|
||||
List<String> allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
|
||||
|
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.parameter;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class AbstractParameterGroup<T> {
|
||||
|
||||
private final String groupName;
|
||||
|
||||
private final Collection<T> items;
|
||||
|
||||
/**
|
||||
* Creates a named parameter group.
|
||||
* @param groupName The parameter group name
|
||||
* @param items A collection of grouped items
|
||||
*/
|
||||
protected AbstractParameterGroup(final String groupName, final Collection<T> items) {
|
||||
this.groupName = Objects.requireNonNull(groupName, "Group name is required");
|
||||
this.items = items;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The group name
|
||||
*/
|
||||
public String getGroupName() {
|
||||
return groupName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The collection of grouped items
|
||||
*/
|
||||
public Collection<T> getItems() {
|
||||
return items;
|
||||
}
|
||||
}
|
|
@ -28,17 +28,11 @@ import java.util.Map;
|
|||
*/
|
||||
public interface EnvironmentVariables {
|
||||
|
||||
/**
|
||||
* Returns an empty registry which can be used as a more intentional null
|
||||
* value.
|
||||
*/
|
||||
public static final EnvironmentVariables EMPTY_ENVIRONMENT_VARIABLES = () -> Collections.emptyMap();
|
||||
|
||||
/**
|
||||
* Provides a registry containing all environment variables and system
|
||||
* properties. System properties receive precedence.
|
||||
*/
|
||||
public static final EnvironmentVariables ENVIRONMENT_VARIABLES = new EnvironmentVariables() {
|
||||
public final EnvironmentVariables ENVIRONMENT_VARIABLES = new EnvironmentVariables() {
|
||||
final Map<VariableDescriptor, String> map = new HashMap<>();
|
||||
|
||||
{
|
||||
|
|
|
@ -372,11 +372,6 @@
|
|||
<artifactId>nifi-single-user-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-socket-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.nifi.provenance.ProvenanceEventType;
|
|||
public class EventNode implements ProvenanceEventLineageNode {
|
||||
|
||||
private final ProvenanceEventRecord record;
|
||||
private String clusterNodeIdentifier = null;
|
||||
|
||||
public EventNode(final ProvenanceEventRecord event) {
|
||||
this.record = event;
|
||||
|
|
|
@ -16,15 +16,11 @@
|
|||
*/
|
||||
package org.apache.nifi.parameter;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class ExpressionLanguageAgnosticParameterParser extends AbstractParameterParser {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExpressionLanguageAgnosticParameterParser.class);
|
||||
|
||||
@Override
|
||||
public ParameterTokenList parseTokens(final String input) {
|
||||
|
|
|
@ -16,15 +16,11 @@
|
|||
*/
|
||||
package org.apache.nifi.parameter;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class ExpressionLanguageAwareParameterParser extends AbstractParameterParser implements ParameterParser {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExpressionLanguageAwareParameterParser.class);
|
||||
private static final char DOLLAR_SIGN = '$';
|
||||
|
||||
|
||||
|
|
|
@ -87,13 +87,6 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
public static final String BACKPRESSURE_SIZE = "nifi.queue.backpressure.size";
|
||||
public static final String LISTENER_BOOTSTRAP_PORT = "nifi.listener.bootstrap.port";
|
||||
|
||||
// Encryption Properties for all Repositories
|
||||
public static final String REPOSITORY_ENCRYPTION_PROTOCOL_VERSION = "nifi.repository.encryption.protocol.version";
|
||||
public static final String REPOSITORY_ENCRYPTION_KEY_ID = "nifi.repository.encryption.key.id";
|
||||
public static final String REPOSITORY_ENCRYPTION_KEY_PROVIDER = "nifi.repository.encryption.key.provider";
|
||||
public static final String REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_LOCATION = "nifi.repository.encryption.key.provider.keystore.location";
|
||||
public static final String REPOSITORY_ENCRYPTION_KEY_PROVIDER_KEYSTORE_PASSWORD = "nifi.repository.encryption.key.provider.keystore.password";
|
||||
|
||||
// content repository properties
|
||||
public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory.";
|
||||
public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation";
|
||||
|
@ -1579,10 +1572,6 @@ public class NiFiProperties extends ApplicationProperties {
|
|||
&& getProperty(SECURITY_TRUSTSTORE_PASSWD) != null;
|
||||
}
|
||||
|
||||
public String getRepositoryEncryptionKeyId() {
|
||||
return getProperty(REPOSITORY_ENCRYPTION_KEY_ID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the allowed proxy hostnames (and IP addresses) as a List. The hosts have been normalized to the form {@code somehost.com}, {@code somehost.com:port}, or {@code 127.0.0.1}.
|
||||
*
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-commons</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-socket-utils</artifactId>
|
||||
<description>Utilities for socket communication</description>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils-api</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -1,162 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.io.nio.consumer.StreamConsumer;
|
||||
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class AbstractChannelReader implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class);
|
||||
private final String uniqueId;
|
||||
private final SelectionKey key;
|
||||
private final BufferPool bufferPool;
|
||||
private final StreamConsumer consumer;
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null); //the future on which this reader runs...
|
||||
|
||||
public AbstractChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
|
||||
this.uniqueId = id;
|
||||
this.key = key;
|
||||
this.bufferPool = empties;
|
||||
this.consumer = consumerFactory.newInstance(id);
|
||||
consumer.setReturnBufferQueue(bufferPool);
|
||||
}
|
||||
|
||||
protected void setScheduledFuture(final ScheduledFuture<?> future) {
|
||||
this.future.set(future);
|
||||
}
|
||||
|
||||
protected ScheduledFuture<?> getScheduledFuture() {
|
||||
return future.get();
|
||||
}
|
||||
|
||||
protected SelectionKey getSelectionKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return isClosed.get();
|
||||
}
|
||||
|
||||
private void closeStream() {
|
||||
if (isClosed.get()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
isClosed.set(true);
|
||||
future.get().cancel(false);
|
||||
key.cancel();
|
||||
key.channel().close();
|
||||
} catch (final IOException ioe) {
|
||||
LOGGER.warn("Unable to cleanly close stream", ioe);
|
||||
} finally {
|
||||
consumer.signalEndOfStream();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows a subclass to specifically handle how it reads from the given
|
||||
* key's channel into the given buffer.
|
||||
*
|
||||
* @param key of channel to read from
|
||||
* @param buffer to fill
|
||||
* @return the number of bytes read in the final read cycle. A value of zero
|
||||
* or more indicates the channel is still open but a value of -1 indicates
|
||||
* end of stream.
|
||||
* @throws IOException if reading from channel causes failure
|
||||
*/
|
||||
protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException;
|
||||
|
||||
@Override
|
||||
public final void run() {
|
||||
if (!key.isValid() || consumer.isConsumerFinished()) {
|
||||
closeStream();
|
||||
return;
|
||||
}
|
||||
if (!key.isReadable()) {
|
||||
return; //there is nothing available to read...or we aren't allow to read due to throttling
|
||||
}
|
||||
ByteBuffer buffer = null;
|
||||
try {
|
||||
buffer = bufferPool.poll();
|
||||
if (buffer == null) {
|
||||
return; // no buffers available - come back later
|
||||
}
|
||||
final int bytesRead = fillBuffer(key, buffer);
|
||||
buffer.flip();
|
||||
if (buffer.remaining() > 0) {
|
||||
consumer.addFilledBuffer(buffer);
|
||||
buffer = null; //clear the reference - is now the consumer's responsibility
|
||||
} else {
|
||||
buffer.clear();
|
||||
bufferPool.returnBuffer(buffer, 0);
|
||||
buffer = null; //clear the reference - is now back to the queue
|
||||
}
|
||||
if (bytesRead < 0) { //we've reached the end
|
||||
closeStream();
|
||||
}
|
||||
} catch (final Exception ioe) {
|
||||
closeStream();
|
||||
LOGGER.error("Closed channel reader {}", this, ioe);
|
||||
} finally {
|
||||
if (buffer != null) {
|
||||
buffer.clear();
|
||||
bufferPool.returnBuffer(buffer, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean equals(final Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
AbstractChannelReader rhs = (AbstractChannelReader) obj;
|
||||
return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int hashCode() {
|
||||
return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
|
||||
}
|
||||
}
|
|
@ -1,110 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class BufferPool implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class);
|
||||
final BlockingQueue<ByteBuffer> bufferPool;
|
||||
private final static double ONE_MB = 1 << 20;
|
||||
private Calendar lastRateSampleTime = Calendar.getInstance();
|
||||
private final Calendar startTime = Calendar.getInstance();
|
||||
double lastRateSampleMBps = -1.0;
|
||||
double overallMBps = -1.0;
|
||||
private long totalBytesExtracted = 0L;
|
||||
private long lastTotalBytesExtracted = 0L;
|
||||
final double maxRateMBps;
|
||||
|
||||
public BufferPool(final int bufferCount, final int bufferCapacity, final boolean allocateDirect, final double maxRateMBps) {
|
||||
bufferPool = new LinkedBlockingDeque<>(BufferPool.createBuffers(bufferCount, bufferCapacity, allocateDirect));
|
||||
this.maxRateMBps = maxRateMBps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the given buffer to the pool - and clears it.
|
||||
*
|
||||
* @param buffer buffer to return
|
||||
* @param bytesProcessed bytes processed for this buffer being returned
|
||||
* @return true if buffer returned to pool
|
||||
*/
|
||||
public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) {
|
||||
totalBytesExtracted += bytesProcessed;
|
||||
buffer.clear();
|
||||
return bufferPool.add(buffer);
|
||||
}
|
||||
|
||||
//here we enforce the desired rate we want by restricting access to buffers when we're over rate
|
||||
public synchronized ByteBuffer poll() {
|
||||
computeRate();
|
||||
final double weightedAvg = (lastRateSampleMBps * 0.7) + (overallMBps * 0.3);
|
||||
if (overallMBps >= maxRateMBps || weightedAvg >= maxRateMBps) {
|
||||
return null;
|
||||
}
|
||||
return bufferPool.poll();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return bufferPool.size();
|
||||
}
|
||||
|
||||
private synchronized void computeRate() {
|
||||
final Calendar now = Calendar.getInstance();
|
||||
final long measurementDurationMillis = now.getTimeInMillis() - lastRateSampleTime.getTimeInMillis();
|
||||
final double durationSecs = ((double) measurementDurationMillis) / 1000.0;
|
||||
if (durationSecs >= 0.75) { //recompute every 3/4 second or when we're too fast
|
||||
final long totalDurationMillis = now.getTimeInMillis() - startTime.getTimeInMillis();
|
||||
final double totalDurationSecs = ((double) totalDurationMillis) / 1000.0;
|
||||
final long differenceBytes = totalBytesExtracted - lastTotalBytesExtracted;
|
||||
lastTotalBytesExtracted = totalBytesExtracted;
|
||||
lastRateSampleTime = now;
|
||||
final double bps = ((double) differenceBytes) / durationSecs;
|
||||
final double totalBps = ((double) totalBytesExtracted / totalDurationSecs);
|
||||
lastRateSampleMBps = bps / ONE_MB;
|
||||
overallMBps = totalBps / ONE_MB;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<ByteBuffer> createBuffers(final int bufferCount, final int bufferCapacity, final boolean allocateDirect) {
|
||||
final List<ByteBuffer> buffers = new ArrayList<>();
|
||||
for (int i = 0; i < bufferCount; i++) {
|
||||
final ByteBuffer buffer = (allocateDirect) ? ByteBuffer.allocateDirect(bufferCapacity) : ByteBuffer.allocate(bufferCapacity);
|
||||
buffers.add(buffer);
|
||||
}
|
||||
return buffers;
|
||||
}
|
||||
|
||||
private void logChannelReadRates() {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(String.format("Overall rate= %,.4f MB/s / Current Rate= %,.4f MB/s / Total Bytes Read= %d", overallMBps, lastRateSampleMBps, totalBytesExtracted));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
computeRate();
|
||||
logChannelReadRates();
|
||||
}
|
||||
}
|
|
@ -1,158 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Iterator;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public final class ChannelDispatcher implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class);
|
||||
private final Selector serverSocketSelector;
|
||||
private final Selector socketChannelSelector;
|
||||
private final ScheduledExecutorService executor;
|
||||
private final BufferPool emptyBuffers;
|
||||
private final StreamConsumerFactory factory;
|
||||
private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
|
||||
private final long timeout;
|
||||
private final boolean readSingleDatagram;
|
||||
private volatile boolean stop = false;
|
||||
public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
|
||||
|
||||
public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
|
||||
final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit, final boolean readSingleDatagram) {
|
||||
this.serverSocketSelector = serverSocketSelector;
|
||||
this.socketChannelSelector = socketChannelSelector;
|
||||
this.executor = service;
|
||||
this.factory = factory;
|
||||
emptyBuffers = buffers;
|
||||
this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
|
||||
this.readSingleDatagram = readSingleDatagram;
|
||||
}
|
||||
|
||||
public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
|
||||
channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, timeUnit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stop) {
|
||||
try {
|
||||
selectServerSocketKeys();
|
||||
selectSocketChannelKeys();
|
||||
} catch (final Exception ex) {
|
||||
LOGGER.warn("Key selection failed: Normal during shutdown.", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all
|
||||
* channels' keys.
|
||||
*
|
||||
* @throws IOException if unable to select keys
|
||||
*/
|
||||
private void selectServerSocketKeys() throws IOException {
|
||||
int numSelected = serverSocketSelector.select(timeout);
|
||||
if (numSelected == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// for each registered server socket - see if any connections are waiting to be established
|
||||
final Iterator<SelectionKey> itr = serverSocketSelector.selectedKeys().iterator();
|
||||
while (itr.hasNext()) {
|
||||
SelectionKey serverSocketkey = itr.next();
|
||||
final SelectableChannel channel = serverSocketkey.channel();
|
||||
AbstractChannelReader reader = null;
|
||||
if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) {
|
||||
final ServerSocketChannel ssChannel = (ServerSocketChannel) serverSocketkey.channel();
|
||||
final SocketChannel sChannel = ssChannel.accept();
|
||||
if (sChannel != null) {
|
||||
sChannel.configureBlocking(false);
|
||||
final SelectionKey socketChannelKey = sChannel.register(socketChannelSelector, SelectionKey.OP_READ);
|
||||
final String readerId = sChannel.socket().toString();
|
||||
reader = new SocketChannelReader(readerId, socketChannelKey, emptyBuffers, factory);
|
||||
final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L,
|
||||
channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS);
|
||||
reader.setScheduledFuture(readerFuture);
|
||||
socketChannelKey.attach(reader);
|
||||
}
|
||||
}
|
||||
itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0.
|
||||
if (reader != null && LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("{} New Connection established. Server channel: {} Reader: {}", this, channel, reader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors
|
||||
* selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that
|
||||
* the selected key set is not manually changed via a remove operation.
|
||||
*
|
||||
* @throws IOException if unable to select keys
|
||||
*/
|
||||
private void selectSocketChannelKeys() throws IOException {
|
||||
// once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
|
||||
// thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys.
|
||||
int numSelected = socketChannelSelector.select(timeout);
|
||||
if (numSelected == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) {
|
||||
final SelectableChannel channel = socketChannelKey.channel();
|
||||
AbstractChannelReader reader = null;
|
||||
// there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
|
||||
// threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
|
||||
// for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
|
||||
// way to tell if it's new is the lack of an attachment.
|
||||
if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
|
||||
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram);
|
||||
socketChannelKey.attach(reader);
|
||||
final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
|
||||
TimeUnit.MILLISECONDS);
|
||||
reader.setScheduledFuture(readerFuture);
|
||||
}
|
||||
if (reader != null && LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("{} New Connection established. Server channel: {} Reader: {}", this, channel, reader);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
stop = true;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,225 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class provides the entry point to NIO based socket listeners for NiFi
|
||||
* processors and services. There are 2 supported types of Listeners, Datagram
|
||||
* (UDP based transmissions) and ServerSocket (TCP based transmissions). This
|
||||
* will create the ChannelDispatcher, which is a Runnable and is controlled via
|
||||
* the ScheduledExecutorService, which is also created by this class. The
|
||||
* ChannelDispatcher handles connections to the ServerSocketChannels and creates
|
||||
* the readers associated with the resulting SocketChannels. Additionally, this
|
||||
* creates and manages two Selectors, one for ServerSocketChannels and another
|
||||
* for SocketChannels and DatagramChannels.
|
||||
*
|
||||
* The threading model for this consists of one thread for the
|
||||
* ChannelDispatcher, one thread per added SocketChannel reader, one thread per
|
||||
* added DatagramChannel reader. The ChannelDispatcher is not scheduled with
|
||||
* fixed delay as the others are. It is throttled by the provided timeout value.
|
||||
* Within the ChannelDispatcher there are two blocking operations which will
|
||||
* block for the given timeout each time through the enclosing loop.
|
||||
*
|
||||
* All channels are cached in one of the two Selectors via their SelectionKey.
|
||||
* The serverSocketSelector maintains all the added ServerSocketChannels; the
|
||||
* socketChannelSelector maintains the all the add DatagramChannels and the
|
||||
* created SocketChannels. Further, the SelectionKey of the DatagramChannel and
|
||||
* the SocketChannel is injected with the channel's associated reader.
|
||||
*
|
||||
* All ChannelReaders will get throttled by the unavailability of buffers in the
|
||||
* provided BufferPool. This is designed to create back pressure.
|
||||
*
|
||||
*/
|
||||
public final class ChannelListener {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class);
|
||||
private final ScheduledExecutorService executor;
|
||||
private final Selector serverSocketSelector; // used to listen for new connections
|
||||
private final Selector socketChannelSelector; // used to listen on existing connections
|
||||
private final ChannelDispatcher channelDispatcher;
|
||||
private final BufferPool bufferPool;
|
||||
private final int initialBufferPoolSize;
|
||||
private volatile long channelReaderFrequencyMSecs = 50;
|
||||
|
||||
public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
|
||||
TimeUnit unit, final boolean readSingleDatagram) throws IOException {
|
||||
this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
|
||||
this.serverSocketSelector = Selector.open();
|
||||
this.socketChannelSelector = Selector.open();
|
||||
this.bufferPool = bufferPool;
|
||||
this.initialBufferPoolSize = bufferPool.size();
|
||||
channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
|
||||
timeout, unit, readSingleDatagram);
|
||||
executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void setChannelReaderSchedulingPeriod(final long period, final TimeUnit unit) {
|
||||
channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit);
|
||||
channelDispatcher.setChannelReaderFrequency(period, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a server socket channel for listening to connections.
|
||||
*
|
||||
* @param nicIPAddress - if null binds to wildcard address
|
||||
* @param port - port to bind to
|
||||
* @param receiveBufferSize - size of OS receive buffer to request. If less
|
||||
* than 0 then will not be set and OS default will win.
|
||||
* @throws IOException if unable to add socket
|
||||
*/
|
||||
public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
|
||||
throws IOException {
|
||||
final ServerSocketChannel ssChannel = ServerSocketChannel.open();
|
||||
ssChannel.configureBlocking(false);
|
||||
if (receiveBufferSize > 0) {
|
||||
ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
|
||||
final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
if (actualReceiveBufSize < receiveBufferSize) {
|
||||
LOGGER.warn("{} attempted to set TCP Receive Buffer Size to {} bytes but could only set to {} bytes. You may want to consider changing the Operating System's maximum receive buffer",
|
||||
this, receiveBufferSize, actualReceiveBufSize);
|
||||
}
|
||||
}
|
||||
ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
ssChannel.bind(new InetSocketAddress(nicIPAddress, port));
|
||||
ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Binds to listen for data grams on the given local IPAddress/port
|
||||
*
|
||||
* @param nicIPAddress - if null will listen on wildcard address, which
|
||||
* means datagrams will be received on all local network interfaces.
|
||||
* Otherwise, will bind to the provided IP address associated with some NIC.
|
||||
* @param port - the port to listen on
|
||||
* @param receiveBufferSize - the number of bytes to request for a receive
|
||||
* buffer from OS
|
||||
* @throws IOException if unable to add channel
|
||||
*/
|
||||
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
|
||||
throws IOException {
|
||||
final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
|
||||
dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
|
||||
}
|
||||
|
||||
/**
|
||||
* Binds to listen for data grams on the given local IPAddress/port and
|
||||
* restricts receipt of datagrams to those from the provided host and port,
|
||||
* must specify both. This improves performance for datagrams coming from a
|
||||
* sender that is known a-priori.
|
||||
*
|
||||
* @param nicIPAddress - if null will listen on wildcard address, which
|
||||
* means datagrams will be received on all local network interfaces.
|
||||
* Otherwise, will bind to the provided IP address associated with some NIC.
|
||||
* @param port - the port to listen on. This is used to provide a well-known
|
||||
* destination for a sender.
|
||||
* @param receiveBufferSize - the number of bytes to request for a receive
|
||||
* buffer from OS
|
||||
* @param sendingHost - the hostname, or IP address, of the sender of
|
||||
* datagrams. Only datagrams from this host will be received. If this is
|
||||
* null the wildcard ip is used, which means datagrams may be received from
|
||||
* any network interface on the local host.
|
||||
* @param sendingPort - the port used by the sender of datagrams. Only
|
||||
* datagrams from this port will be received.
|
||||
* @throws IOException if unable to add channel
|
||||
*/
|
||||
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
|
||||
final Integer sendingPort) throws IOException {
|
||||
|
||||
if (sendingHost == null || sendingPort == null) {
|
||||
addDatagramChannel(nicIPAddress, port, receiveBufferSize);
|
||||
return;
|
||||
}
|
||||
final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
|
||||
dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
|
||||
dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
|
||||
}
|
||||
|
||||
private DatagramChannel createAndBindDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
|
||||
throws IOException {
|
||||
final DatagramChannel dChannel = DatagramChannel.open();
|
||||
dChannel.configureBlocking(false);
|
||||
if (receiveBufferSize > 0) {
|
||||
dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
|
||||
final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
if (actualReceiveBufSize < receiveBufferSize) {
|
||||
LOGGER.warn("{} attempted to set UDP Receive Buffer Size to {} bytes but could only set to {} bytes. You may want to consider changing the Operating System's maximum receive buffer",
|
||||
this, receiveBufferSize, actualReceiveBufSize);
|
||||
}
|
||||
}
|
||||
dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
dChannel.bind(new InetSocketAddress(nicIPAddress, port));
|
||||
return dChannel;
|
||||
}
|
||||
|
||||
public void shutdown(final long period, final TimeUnit timeUnit) {
|
||||
channelDispatcher.stop();
|
||||
for (SelectionKey selectionKey : socketChannelSelector.keys()) {
|
||||
final AbstractChannelReader reader = (AbstractChannelReader) selectionKey.attachment();
|
||||
selectionKey.cancel();
|
||||
if (reader != null) {
|
||||
while (!reader.isClosed()) {
|
||||
try {
|
||||
Thread.sleep(channelReaderFrequencyMSecs);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
final ScheduledFuture<?> readerFuture = reader.getScheduledFuture();
|
||||
readerFuture.cancel(false);
|
||||
}
|
||||
IOUtils.closeQuietly(selectionKey.channel()); // should already be closed via reader, but if reader did not exist...
|
||||
}
|
||||
IOUtils.closeQuietly(socketChannelSelector);
|
||||
|
||||
for (SelectionKey selectionKey : serverSocketSelector.keys()) {
|
||||
selectionKey.cancel();
|
||||
IOUtils.closeQuietly(selectionKey.channel());
|
||||
}
|
||||
IOUtils.closeQuietly(serverSocketSelector);
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(period, timeUnit);
|
||||
} catch (final InterruptedException ex) {
|
||||
LOGGER.warn("Interrupted while trying to shutdown executor");
|
||||
}
|
||||
final int currentBufferPoolSize = bufferPool.size();
|
||||
if (currentBufferPoolSize != initialBufferPoolSize) {
|
||||
LOGGER.info("Channel listener shutdown. Initial buffer count={} Current buffer count={} Could indicate a buffer leak. Ensure all consumers are executed until they complete.",
|
||||
initialBufferPoolSize, currentBufferPoolSize);
|
||||
} else {
|
||||
LOGGER.info("Channel listener shutdown.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
|
||||
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
|
||||
|
||||
public final class DatagramChannelReader extends AbstractChannelReader {
|
||||
|
||||
public static final int MAX_UDP_PACKET_SIZE = 65507;
|
||||
|
||||
private final boolean readSingleDatagram;
|
||||
|
||||
public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory,
|
||||
final boolean readSingleDatagram) {
|
||||
super(id, key, empties, consumerFactory);
|
||||
this.readSingleDatagram = readSingleDatagram;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will receive UDP data from channel and won't receive anything unless the
|
||||
* given buffer has enough space for at least one full max udp packet.
|
||||
*
|
||||
* @param key selection key
|
||||
* @param buffer to fill
|
||||
* @return bytes read
|
||||
* @throws IOException if error filling buffer from channel
|
||||
*/
|
||||
@Override
|
||||
protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
|
||||
final DatagramChannel dChannel = (DatagramChannel) key.channel();
|
||||
final int initialBufferPosition = buffer.position();
|
||||
while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
|
||||
if (dChannel.receive(buffer) == null || readSingleDatagram) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return buffer.position() - initialBufferPosition;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
|
||||
|
||||
public final class SocketChannelReader extends AbstractChannelReader {
|
||||
|
||||
public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
|
||||
super(id, key, empties, consumerFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives TCP data from the socket channel for the given key.
|
||||
*
|
||||
* @param key selection key
|
||||
* @param buffer byte buffer to fill
|
||||
* @return bytes read
|
||||
* @throws IOException if error reading bytes
|
||||
*/
|
||||
@Override
|
||||
protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
|
||||
int bytesRead = 0;
|
||||
final SocketChannel sChannel = (SocketChannel) key.channel();
|
||||
while (key.isValid() && key.isReadable()) {
|
||||
bytesRead = sChannel.read(buffer);
|
||||
if (bytesRead <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return bytesRead;
|
||||
}
|
||||
}
|
|
@ -1,131 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio.consumer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.nifi.io.nio.BufferPool;
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractStreamConsumer implements StreamConsumer {
|
||||
|
||||
private final String uniqueId;
|
||||
private BufferPool bufferPool = null;
|
||||
private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>();
|
||||
private final AtomicBoolean streamEnded = new AtomicBoolean(false);
|
||||
private final AtomicBoolean consumerEnded = new AtomicBoolean(false);
|
||||
|
||||
public AbstractStreamConsumer(final String id) {
|
||||
uniqueId = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void setReturnBufferQueue(final BufferPool returnQueue) {
|
||||
bufferPool = returnQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void addFilledBuffer(final ByteBuffer buffer) {
|
||||
if (isConsumerFinished()) {
|
||||
buffer.clear();
|
||||
bufferPool.returnBuffer(buffer, buffer.remaining());
|
||||
} else {
|
||||
filledBuffers.add(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void process() throws IOException {
|
||||
if (isConsumerFinished()) {
|
||||
return;
|
||||
}
|
||||
if (streamEnded.get() && filledBuffers.isEmpty()) {
|
||||
consumerEnded.set(true);
|
||||
onConsumerDone();
|
||||
return;
|
||||
}
|
||||
final ByteBuffer buffer = filledBuffers.poll();
|
||||
if (buffer != null) {
|
||||
final int bytesToProcess = buffer.remaining();
|
||||
try {
|
||||
processBuffer(buffer);
|
||||
} finally {
|
||||
buffer.clear();
|
||||
bufferPool.returnBuffer(buffer, bytesToProcess);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void processBuffer(ByteBuffer buffer) throws IOException;
|
||||
|
||||
@Override
|
||||
public final void signalEndOfStream() {
|
||||
streamEnded.set(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method that is called when the consumer is done processing
|
||||
* based on being told the signal is end of stream and has processed all
|
||||
* given buffers.
|
||||
*/
|
||||
protected void onConsumerDone() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isConsumerFinished() {
|
||||
return consumerEnded.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getId() {
|
||||
return uniqueId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean equals(final Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj == this) {
|
||||
return true;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj;
|
||||
return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int hashCode() {
|
||||
return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
|
||||
}
|
||||
}
|
|
@ -1,80 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio.consumer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import org.apache.nifi.io.nio.BufferPool;
|
||||
|
||||
/**
|
||||
* A StreamConsumer must be thread safe. It may be accessed concurrently by a
|
||||
* thread providing data to process and another thread that is processing that
|
||||
* data.
|
||||
*
|
||||
*/
|
||||
public interface StreamConsumer {
|
||||
|
||||
/**
|
||||
* Will be called once just after construction. It provides the queue to
|
||||
* which processed and emptied and cleared buffers must be returned. For
|
||||
* each time <code>addFilledBuffer</code> is called there should be an
|
||||
* associated add to this given queue. If not, buffers will run out and all
|
||||
* stream processing will halt. READ THIS!!!
|
||||
*
|
||||
* @param returnQueue pool of buffers to use
|
||||
*/
|
||||
void setReturnBufferQueue(BufferPool returnQueue);
|
||||
|
||||
/**
|
||||
* Will be called by the thread that produces byte buffers with available
|
||||
* data to be processed. If the consumer is finished this should simply
|
||||
* return the given buffer to the return buffer queue (after it is cleared)
|
||||
*
|
||||
* @param buffer filled buffer
|
||||
*/
|
||||
void addFilledBuffer(ByteBuffer buffer);
|
||||
|
||||
/**
|
||||
* Will be called by the thread that executes the consumption of data. May
|
||||
* be called many times though once <code>isConsumerFinished</code> returns
|
||||
* true this method will likely do nothing.
|
||||
*
|
||||
* @throws java.io.IOException if there is an issue processing
|
||||
*/
|
||||
void process() throws IOException;
|
||||
|
||||
/**
|
||||
* Called once the end of the input stream is detected
|
||||
*/
|
||||
void signalEndOfStream();
|
||||
|
||||
/**
|
||||
* If true signals the consumer is done consuming data and will not process
|
||||
* any more buffers.
|
||||
*
|
||||
* @return true if finished
|
||||
*/
|
||||
boolean isConsumerFinished();
|
||||
|
||||
/**
|
||||
* Uniquely identifies the consumer
|
||||
*
|
||||
* @return identifier of consumer
|
||||
*/
|
||||
String getId();
|
||||
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.io.nio.consumer;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface StreamConsumerFactory {
|
||||
|
||||
StreamConsumer newInstance(String streamId);
|
||||
|
||||
}
|
|
@ -55,7 +55,6 @@
|
|||
<module>nifi-security-utils</module>
|
||||
<module>nifi-single-user-utils</module>
|
||||
<module>nifi-site-to-site-client</module>
|
||||
<module>nifi-socket-utils</module>
|
||||
<module>nifi-swagger-integration</module>
|
||||
<module>nifi-utils</module>
|
||||
<module>nifi-uuid5</module>
|
||||
|
|
|
@ -24,7 +24,6 @@ public class GenerateAttachment {
|
|||
String to;
|
||||
String subject;
|
||||
String message;
|
||||
String hostName;
|
||||
|
||||
private static final String NEWLINE = "\n";
|
||||
|
||||
|
@ -35,7 +34,6 @@ public class GenerateAttachment {
|
|||
this.to = to;
|
||||
this.subject = subject;
|
||||
this.message = message;
|
||||
this.hostName = hostName;
|
||||
}
|
||||
|
||||
public byte[] simpleMessage() {
|
||||
|
|
|
@ -96,15 +96,10 @@ public class TestJdbcTypesH2 {
|
|||
|
||||
st.executeUpdate(createTable);
|
||||
|
||||
// st.executeUpdate("insert into users (email, password, activation_code, forgotten_password_code, forgotten_password_time, created, active, home_module_id) "
|
||||
// + " values ('robert.gates@cold.com', '******', 'CAS', 'ounou', '2005-12-09', '2005-12-03', 1, 5)");
|
||||
|
||||
st.executeUpdate("insert into users (email, password, activation_code, created, active, somebinary, somebinary2, somebinary3, someblob, someclob) "
|
||||
+ " values ('mari.gates@cold.com', '******', 'CAS', '2005-12-03', 3, 0x66FF, 'ABDF', 'EE64', 'BB22', 'CC88')");
|
||||
|
||||
final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
|
||||
// final ResultSet resultSet = st.executeQuery("select U.active from users U");
|
||||
// final ResultSet resultSet = st.executeQuery("select U.somebinary from users U");
|
||||
|
||||
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
|
||||
JdbcCommon.convertToAvroStream(resultSet, outStream, false);
|
||||
|
|
|
@ -47,10 +47,4 @@ public class BigQueryAttributes {
|
|||
// Batch Attributes
|
||||
public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
|
||||
public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted";
|
||||
|
||||
public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
|
||||
|
||||
public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
|
||||
|
||||
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
|
||||
}
|
|
@ -23,12 +23,10 @@ import org.apache.nifi.controller.AbstractControllerService;
|
|||
import org.apache.nifi.dbcp.DBCPService;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.MockProcessorInitializationContext;
|
||||
|
@ -69,7 +67,6 @@ public class ExecuteGroovyScriptTest {
|
|||
protected static DBCPService dbcp = null; //to make single initialization
|
||||
protected MockRecordParser recordParser = null;
|
||||
protected RecordSetWriterFactory recordWriter = null;
|
||||
protected RecordSchema recordSchema = null;
|
||||
protected ExecuteGroovyScript proc;
|
||||
public final String TEST_RESOURCE_LOCATION = "target/test/resources/groovy/";
|
||||
private final String TEST_CSV_DATA = "gender,title,first,last\n"
|
||||
|
@ -135,7 +132,6 @@ public class ExecuteGroovyScriptTest {
|
|||
new RecordField("id", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("name", RecordFieldType.STRING.getDataType()),
|
||||
new RecordField("code", RecordFieldType.INT.getDataType()));
|
||||
recordSchema = new SimpleRecordSchema(recordFields);
|
||||
|
||||
recordParser = new MockRecordParser();
|
||||
recordFields.forEach((r) -> recordParser.addSchemaField(r));
|
||||
|
|
|
@ -117,10 +117,6 @@ public class TestCreateHadoopSequenceFile {
|
|||
final String valueType = BytesWritable.class.getCanonicalName();
|
||||
assertEquals(valueType.length(), valueTypeLength);
|
||||
assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
|
||||
// FileOutputStream fos = new FileOutputStream("test.sf");
|
||||
// fos.write(data);
|
||||
// fos.flush();
|
||||
// fos.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -157,10 +153,6 @@ public class TestCreateHadoopSequenceFile {
|
|||
assertTrue(data.length > 1000000);
|
||||
assertTrue(data.length < 1501000);
|
||||
}
|
||||
// FileOutputStream fos = new FileOutputStream("zip-3-randoms.sf");
|
||||
// fos.write(data);
|
||||
// fos.flush();
|
||||
// fos.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -180,10 +172,6 @@ public class TestCreateHadoopSequenceFile {
|
|||
assertTrue(data.length > 1000000);
|
||||
assertTrue(data.length < 1501000);
|
||||
}
|
||||
// FileOutputStream fos = new FileOutputStream("flowfilePkg-3-randoms.sf");
|
||||
// fos.write(data);
|
||||
// fos.flush();
|
||||
// fos.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -50,7 +50,7 @@ public class ExtractImageMetadataTest {
|
|||
|
||||
@Test
|
||||
public void testFailedExtraction() throws IOException {
|
||||
MockFlowFile flowFile = verifyTestRunnerFlow("src/test/resources/notImage.txt", ExtractImageMetadata.FAILURE, null);
|
||||
verifyTestRunnerFlow("src/test/resources/notImage.txt", ExtractImageMetadata.FAILURE, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -14,13 +14,12 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.network.parser.util;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Arrays;
|
||||
|
||||
public final class ConversionUtil {
|
||||
public static final String toIPV4(final byte[] buffer, final int offset, final int length) {
|
||||
public static String toIPV4(final byte[] buffer, final int offset, final int length) {
|
||||
try {
|
||||
return InetAddress.getByAddress(Arrays.copyOfRange(buffer, offset, offset + length)).getHostAddress();
|
||||
} catch (UnknownHostException e) {
|
||||
|
@ -28,32 +27,16 @@ public final class ConversionUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static final String toIPV6(final byte[] buffer, final int offset, final int length) {
|
||||
try {
|
||||
return InetAddress.getByAddress(Arrays.copyOfRange(buffer, offset, offset + length)).getHostAddress();
|
||||
} catch (UnknownHostException e) {
|
||||
return String.valueOf(toLong(buffer, offset, length));
|
||||
}
|
||||
}
|
||||
|
||||
public static final BigInteger toBigInteger(final byte[] buffer, final int offset, final int length) {
|
||||
return new BigInteger(Arrays.copyOfRange(buffer, offset, offset + length));
|
||||
}
|
||||
|
||||
public static final byte toByte(final byte[] buffer, final int offset) {
|
||||
return (byte) (buffer[offset] & 0xff);
|
||||
}
|
||||
|
||||
public static final int toInt(final byte[] buffer, final int offset, final int length) {
|
||||
public static int toInt(final byte[] buffer, final int offset, final int length) {
|
||||
int ret = 0;
|
||||
final int done = offset + length;
|
||||
for (int i = offset; i < done; i++) {
|
||||
ret = ((ret << 8) & 0xffffffff) + (buffer[i] & 0xff);
|
||||
ret = ((ret << 8)) + (buffer[i] & 0xff);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static final long toLong(final byte[] buffer, final int offset, final int length) {
|
||||
public static long toLong(final byte[] buffer, final int offset, final int length) {
|
||||
long ret = 0;
|
||||
final int done = offset + length;
|
||||
for (int i = offset; i < done; i++) {
|
||||
|
@ -63,7 +46,7 @@ public final class ConversionUtil {
|
|||
return ret;
|
||||
}
|
||||
|
||||
public static final short toShort(final byte[] buffer, final int offset, final int length) {
|
||||
public static short toShort(final byte[] buffer, final int offset, final int length) {
|
||||
short ret = 0;
|
||||
final int done = offset + length;
|
||||
for (int i = offset; i < done; i++) {
|
||||
|
@ -72,14 +55,4 @@ public final class ConversionUtil {
|
|||
return ret;
|
||||
}
|
||||
|
||||
public static final String toString(final byte[] buffer, final int offset, final int length) {
|
||||
return new String(Arrays.copyOfRange(buffer, offset, offset + length));
|
||||
}
|
||||
|
||||
public static byte[] toByteArray(final int i) {
|
||||
final byte[] result = new byte[2];
|
||||
result[0] = (byte) (i >> 8);
|
||||
result[1] = (byte) (i);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -220,7 +220,6 @@ public class GetShopify extends AbstractProcessor {
|
|||
private static final int TOO_MANY_REQUESTS = 429;
|
||||
private static final Pattern CURSOR_PATTERN = Pattern.compile("<([^<]*)>; rel=\"next\"");
|
||||
private static final String LAST_EXECUTION_TIME_KEY = "last_execution_time";
|
||||
private static final int EXCLUSIVE_TIME_WINDOW_ADJUSTMENT = 1;
|
||||
private static final List<String> RESET_STATE_PROPERTY_NAMES;
|
||||
|
||||
static {
|
||||
|
|
|
@ -564,7 +564,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
|||
int exitCode;
|
||||
final boolean putToAttribute;
|
||||
final int attributeSize;
|
||||
final String attributeName;
|
||||
|
||||
byte[] outputBuffer;
|
||||
int size;
|
||||
|
@ -580,7 +579,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
|||
this.process = process;
|
||||
this.putToAttribute = putToAttribute;
|
||||
this.attributeSize = attributeSize;
|
||||
this.attributeName = attributeName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,9 +26,6 @@ public enum CertificateAttribute {
|
|||
/** Certificate Issuer Distinguished Name */
|
||||
HTTP_ISSUER_DN("http.issuer.dn"),
|
||||
|
||||
/** Certificate Subject Distinguished Name */
|
||||
HTTP_CERTIFICATE_PARSING_EXCEPTION("http.certificate.parsing.exception"),
|
||||
|
||||
/** Certificate Subject Alternative Names */
|
||||
HTTP_CERTIFICATE_SANS("http.certificate.sans");
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import jakarta.ws.rs.Path;
|
|||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processors.standard.ListenHTTP;
|
||||
import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
|
||||
import org.apache.nifi.security.cert.StandardPrincipalFormatter;
|
||||
|
@ -43,7 +42,6 @@ public class ContentAcknowledgmentServlet extends HttpServlet {
|
|||
public static final String DEFAULT_FOUND_SUBJECT = "none";
|
||||
private static final long serialVersionUID = -2675148117984902978L;
|
||||
|
||||
private Processor processor;
|
||||
private Pattern authorizedPattern;
|
||||
private ComponentLog logger;
|
||||
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
|
||||
|
@ -53,7 +51,6 @@ public class ContentAcknowledgmentServlet extends HttpServlet {
|
|||
@Override
|
||||
public void init(final ServletConfig config) throws ServletException {
|
||||
final ServletContext context = config.getServletContext();
|
||||
this.processor = (Processor) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESSOR);
|
||||
this.logger = (ComponentLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
|
||||
this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
|
||||
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
|
||||
|
|
|
@ -82,10 +82,6 @@ public class TestLogMessage {
|
|||
assertEquals(1, successFlowFiles.size());
|
||||
|
||||
MockComponentLog mockComponentLog = testableLogMessage.getMockComponentLog();
|
||||
List<org.apache.nifi.util.LogMessage> infoMessages = mockComponentLog.getInfoMessages();
|
||||
//TODO NIFI-12998 find why these fail
|
||||
//assertEquals(1, infoMessages.size());
|
||||
//assertTrue(infoMessages.get(0).getMsg().endsWith("This should help the operator to follow the flow: baz"));
|
||||
|
||||
assertTrue(mockComponentLog.getTraceMessages().isEmpty());
|
||||
assertTrue(mockComponentLog.getDebugMessages().isEmpty());
|
||||
|
@ -110,12 +106,6 @@ public class TestLogMessage {
|
|||
assertEquals(1, successFlowFiles.size());
|
||||
|
||||
MockComponentLog mockComponentLog = testableLogMessage.getMockComponentLog();
|
||||
List<org.apache.nifi.util.LogMessage> infoMessages = mockComponentLog.getInfoMessages();
|
||||
//TODO NIFI-12998 find why these fail
|
||||
//assertEquals(1, infoMessages.size());
|
||||
//assertTrue(infoMessages.get(0).getMsg().endsWith("FOOBAR>>>This should help the operator to follow the flow: baz"));
|
||||
|
||||
|
||||
|
||||
assertTrue(mockComponentLog.getTraceMessages().isEmpty());
|
||||
assertTrue(mockComponentLog.getDebugMessages().isEmpty());
|
||||
|
|
|
@ -36,55 +36,6 @@ public class TestTailFileGeneratedScenarios extends AbstractTestTailFileScenario
|
|||
|
||||
@BeforeAll
|
||||
public static final void createParameters() {
|
||||
// Uncomment the portion for which to run the scenarios.
|
||||
// They cannot be added to a single large batch because it opens too many files.
|
||||
// final List<Action> baseActions = Arrays.asList(
|
||||
// Action.WRITE_WORD,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.WRITE_WORD,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.OVERWRITE_NUL
|
||||
// );
|
||||
// addAction(parameters, Action.TRIGGER, baseActions, 0, 0);
|
||||
// new ArrayList<>(parameters).forEach(anActionList -> addAction(parameters, Action.ROLLOVER, (List<Action>)anActionList[0], 0, 0));
|
||||
// new ArrayList<>(parameters).forEach(anActionList -> addAction(parameters, Action.SWITCH_FILE, (List<Action>)anActionList[0], 0, 0));
|
||||
|
||||
// final List<Action> baseActions = Arrays.asList(
|
||||
// Action.WRITE_WORD,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.WRITE_WORD,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.ROLLOVER,
|
||||
// Action.WRITE_WORD,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.WRITE_WORD,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.SWITCH_FILE,
|
||||
// Action.WRITE_WORD,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.OVERWRITE_NUL,
|
||||
// Action.WRITE_WORD,
|
||||
// Action.WRITE_NUL,
|
||||
// Action.WRITE_NEW_LINE,
|
||||
// Action.OVERWRITE_NUL
|
||||
// );
|
||||
// addAction(parameters, Action.TRIGGER, baseActions, 0, 1);
|
||||
|
||||
final List<Action> baseActions = Arrays.asList(
|
||||
Action.WRITE_WORD, Action.WRITE_WORD,
|
||||
|
|
|
@ -39,8 +39,6 @@ public class BundleDetailsTest {
|
|||
final String buildJdk = "JDK8";
|
||||
final String builtBy = "bbende";
|
||||
|
||||
final boolean cloneDuringInstanceClassLoading = true;
|
||||
|
||||
final BundleDetails bundleDetails = new BundleDetails.Builder()
|
||||
.workingDir(workingDirectory)
|
||||
.coordinate(coordinate)
|
||||
|
|
|
@ -524,7 +524,6 @@ public class FileAccessPolicyProviderTest {
|
|||
@Test
|
||||
public void testOnConfiguredWhenNodeGroupEmpty() throws Exception {
|
||||
final String adminIdentity = "admin-user";
|
||||
final String nodeGroupIdentifier = "cluster-nodes";
|
||||
|
||||
when(configurationContext.getProperty(eq(FileAccessPolicyProvider.PROP_INITIAL_ADMIN_IDENTITY)))
|
||||
.thenReturn(new StandardPropertyValue(adminIdentity, null, ParameterLookup.EMPTY));
|
||||
|
|
|
@ -42,12 +42,6 @@
|
|||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-socket-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils-api</artifactId>
|
||||
|
|
|
@ -74,11 +74,6 @@
|
|||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-socket-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils-api</artifactId>
|
||||
|
|
|
@ -1,123 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.coordination.node;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolContext;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolException;
|
||||
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
|
||||
import org.apache.nifi.framework.cluster.zookeeper.ZooKeeperClientConfig;
|
||||
import org.apache.nifi.io.socket.SocketConfiguration;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Uses Apache Curator to determine the address of the current cluster
|
||||
* coordinator
|
||||
*/
|
||||
public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(CuratorNodeProtocolSender.class);
|
||||
|
||||
private final String coordinatorPath;
|
||||
private final ZooKeeperClientConfig zkConfig;
|
||||
private InetSocketAddress coordinatorAddress;
|
||||
|
||||
public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext<ProtocolMessage> protocolContext, final NiFiProperties nifiProperties) {
|
||||
super(socketConfig, protocolContext);
|
||||
zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties);
|
||||
coordinatorPath = zkConfig.resolvePath("cluster/nodes/coordinator");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized InetSocketAddress getServiceAddress() throws IOException {
|
||||
if (coordinatorAddress != null) {
|
||||
return coordinatorAddress;
|
||||
}
|
||||
|
||||
final RetryPolicy retryPolicy = new RetryNTimes(0, 0);
|
||||
final CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
|
||||
zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
|
||||
curatorClient.start();
|
||||
|
||||
try {
|
||||
// Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
|
||||
final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
|
||||
@Override
|
||||
public void process(final WatchedEvent event) {
|
||||
coordinatorAddress = null;
|
||||
}
|
||||
}).forPath(coordinatorPath);
|
||||
|
||||
if (coordinatorAddressBytes == null || coordinatorAddressBytes.length == 0) {
|
||||
throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
|
||||
}
|
||||
|
||||
final String address = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
|
||||
|
||||
final String[] splits = address.split(":");
|
||||
if (splits.length != 2) {
|
||||
final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates "
|
||||
+ "that address is %s, but this is not in the expected format of <hostname>:<port>", address);
|
||||
logger.error(message);
|
||||
throw new ProtocolException(message);
|
||||
}
|
||||
|
||||
logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
|
||||
|
||||
final String hostname = splits[0];
|
||||
final int port;
|
||||
try {
|
||||
port = Integer.parseInt(splits[1]);
|
||||
if (port < 1 || port > 65535) {
|
||||
throw new NumberFormatException("Port must be in the range of 1 - 65535 but got " + port);
|
||||
}
|
||||
} catch (final NumberFormatException nfe) {
|
||||
final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates "
|
||||
+ "that address is %s, but the port is not a valid port number", address);
|
||||
logger.error(message);
|
||||
throw new ProtocolException(message);
|
||||
}
|
||||
|
||||
final InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(hostname, port);
|
||||
coordinatorAddress = socketAddress;
|
||||
return socketAddress;
|
||||
} catch (final NoNodeException nne) {
|
||||
logger.info("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
|
||||
throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet.");
|
||||
} catch (final NoClusterCoordinatorException ncce) {
|
||||
throw ncce;
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
|
||||
} finally {
|
||||
curatorClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -304,7 +304,7 @@ public class TestThreadPoolRequestReplicator {
|
|||
new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true);
|
||||
clusterResponse.awaitMergedResponse();
|
||||
|
||||
// Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not.
|
||||
// Ensure that we received two requests
|
||||
// These assertions are validated above, in the overridden replicateRequest method.
|
||||
assertEquals(2, requestCount.get());
|
||||
|
||||
|
|
|
@ -2519,7 +2519,6 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
|
|||
final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size());
|
||||
|
||||
final Connectable connectable = context.getConnectable();
|
||||
final String processorType = connectable.getComponentType();
|
||||
final InternalProvenanceReporter expiredReporter = context.createProvenanceReporter(this::isFlowFileKnown, this);
|
||||
|
||||
final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
|
||||
|
|
|
@ -142,9 +142,6 @@ public class StandardControllerServiceInvocationHandler implements ControllerSer
|
|||
// may perform further inspection. For example, consider that a javax.jms.Message is returned. If this method proxies
|
||||
// only that method, but the object itself is a javax.jms.BytesMessage, then code such as the following will result in `isBytes == false`
|
||||
// when it should be `true`:
|
||||
//
|
||||
// final javax.jms.Message myMessage = controllerService.getMessage();
|
||||
// final boolean isBytes = myMessage instanceof javax.jms.BytesMessage;
|
||||
final List<Class<?>> interfaces = ClassUtils.getAllInterfaces(bareObject.getClass());
|
||||
if (interfaces == null || interfaces.isEmpty()) {
|
||||
return bareObject;
|
||||
|
|
|
@ -657,18 +657,6 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
return getPropertyValues((descriptor, config) -> getConfigValue(config, isResolveParameter(descriptor, config)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts from a Map of PropertyDescriptor to value, to a Map of property name to value
|
||||
*
|
||||
* @param propertyValues the property values to convert
|
||||
* @return a Map whose keys are the names of the properties instead of descriptors
|
||||
*/
|
||||
public Map<String, String> toPropertyNameMap(final Map<PropertyDescriptor, String> propertyValues) {
|
||||
final Map<String, String> converted = new HashMap<>();
|
||||
propertyValues.forEach((key, value) -> converted.put(key.getName(), value));
|
||||
return converted;
|
||||
}
|
||||
|
||||
private Map<PropertyDescriptor, String> getPropertyValues(final BiFunction<PropertyDescriptor, PropertyConfiguration, String> valueFunction) {
|
||||
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getIdentifier())) {
|
||||
final List<PropertyDescriptor> supported = getComponent().getPropertyDescriptors();
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.groups;
|
||||
|
||||
public enum DataflowDirection {
|
||||
|
||||
/**
|
||||
* Data is currently flowing into the Process Group
|
||||
*/
|
||||
INTO_PROCESS_GROUP,
|
||||
|
||||
/**
|
||||
* Data is currently flowing out of the Process Group
|
||||
*/
|
||||
OUT_OF_PROCESS_GROUP;
|
||||
|
||||
}
|
|
@ -71,11 +71,6 @@
|
|||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-socket-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>c2-protocol-component-api</artifactId>
|
||||
|
|
|
@ -805,7 +805,6 @@ public class ExtensionBuilder {
|
|||
try {
|
||||
final LoggableComponent<FlowAnalysisRule> loggableComponent = createLoggableComponent(FlowAnalysisRule.class, new StandardLoggingContext(null));
|
||||
|
||||
final String taskName = loggableComponent.getComponent().getClass().getSimpleName();
|
||||
final FlowAnalysisRuleInitializationContext config = new StandardFlowAnalysisInitializationContext(identifier,
|
||||
loggableComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);
|
||||
|
||||
|
|
|
@ -2234,11 +2234,11 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
|
|||
final FlowFileQueue flowFileQueue;
|
||||
|
||||
if (clusterCoordinator == null) {
|
||||
flowFileQueue = new StandardFlowFileQueue(id, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
|
||||
flowFileQueue = new StandardFlowFileQueue(id, flowFileRepository, provenanceRepository, processScheduler, swapManager,
|
||||
eventReporter, nifiProperties.getQueueSwapThreshold(),
|
||||
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
|
||||
} else {
|
||||
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
|
||||
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, processScheduler, flowFileRepository, provenanceRepository, contentRepository,
|
||||
clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
|
||||
|
||||
flowFileQueue.setFlowFileExpiration(processGroup.getDefaultFlowFileExpiration());
|
||||
|
|
|
@ -915,7 +915,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
return null;
|
||||
} else {
|
||||
// cluster manager provided a successful response with a current dataflow
|
||||
// persist node uuid and index returned by NCM and return the response to the caller
|
||||
// persist node uuid and index returned by coordinator and return the response to the caller
|
||||
try {
|
||||
// Ensure that we have registered our 'cluster node configuration' state key
|
||||
final Map<String, String> map = Collections.singletonMap(NODE_UUID, response.getNodeIdentifier().getId());
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
|
|||
import org.apache.nifi.controller.repository.RepositoryRecord;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
|
@ -55,7 +54,6 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
|
|||
private final String identifier;
|
||||
private final FlowFileRepository flowFileRepository;
|
||||
private final ProvenanceEventRepository provRepository;
|
||||
private final ResourceClaimManager resourceClaimManager;
|
||||
private final ProcessScheduler scheduler;
|
||||
|
||||
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 sec", 0L));
|
||||
|
@ -75,12 +73,11 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
|
|||
|
||||
|
||||
public AbstractFlowFileQueue(final String identifier, final ProcessScheduler scheduler,
|
||||
final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ResourceClaimManager resourceClaimManager) {
|
||||
final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo) {
|
||||
this.identifier = identifier;
|
||||
this.scheduler = scheduler;
|
||||
this.flowFileRepository = flowFileRepo;
|
||||
this.provRepository = provRepo;
|
||||
this.resourceClaimManager = resourceClaimManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
|
|||
import org.apache.nifi.controller.repository.FlowFileRepository;
|
||||
import org.apache.nifi.controller.repository.FlowFileSwapManager;
|
||||
import org.apache.nifi.controller.repository.SwapSummary;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.status.FlowFileAvailability;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
||||
|
@ -52,10 +51,10 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
|
|||
|
||||
|
||||
public StandardFlowFileQueue(final String identifier, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
|
||||
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
|
||||
final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
|
||||
final int swapThreshold, final String expirationPeriod, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
|
||||
|
||||
super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
|
||||
super(identifier, scheduler, flowFileRepo, provRepo);
|
||||
super.setFlowFileExpiration(expirationPeriod);
|
||||
this.swapManager = swapManager;
|
||||
this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null);
|
||||
|
|
|
@ -60,7 +60,6 @@ import org.apache.nifi.controller.repository.StandardRepositoryRecord;
|
|||
import org.apache.nifi.controller.repository.SwapSummary;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.swap.StandardSwapSummary;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
||||
|
@ -125,11 +124,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
|
||||
|
||||
public SocketLoadBalancedFlowFileQueue(final String identifier, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo,
|
||||
final ProvenanceEventRepository provRepo, final ContentRepository contentRepo, final ResourceClaimManager resourceClaimManager,
|
||||
final ProvenanceEventRepository provRepo, final ContentRepository contentRepo,
|
||||
final ClusterCoordinator clusterCoordinator, final AsyncLoadBalanceClientRegistry clientRegistry, final FlowFileSwapManager swapManager,
|
||||
final int swapThreshold, final EventReporter eventReporter) {
|
||||
|
||||
super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
|
||||
super(identifier, scheduler, flowFileRepo, provRepo);
|
||||
this.eventReporter = eventReporter;
|
||||
this.swapManager = swapManager;
|
||||
this.flowFileRepo = flowFileRepo;
|
||||
|
|
|
@ -19,13 +19,14 @@ package org.apache.nifi.controller.queue.clustered.server;
|
|||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.io.socket.SocketUtils;
|
||||
import org.apache.nifi.reporting.Severity;
|
||||
import org.apache.nifi.security.util.TlsException;
|
||||
import org.apache.nifi.security.util.TlsPlatform;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLException;
|
||||
import javax.net.ssl.SSLPeerUnverifiedException;
|
||||
import javax.net.ssl.SSLServerSocket;
|
||||
import java.io.BufferedInputStream;
|
||||
|
@ -37,6 +38,7 @@ import java.net.InetAddress;
|
|||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
@ -182,7 +184,7 @@ public class ConnectionLoadBalanceServer {
|
|||
/* The exceptions can fill the log very quickly and make it difficult to use. SSLPeerUnverifiedExceptions
|
||||
especially repeat and have a long stacktrace, and are not likely to be resolved instantaneously. Suppressing
|
||||
them for a period of time is helpful */
|
||||
if (SocketUtils.isTlsError(e)) {
|
||||
if (isTlsError(e)) {
|
||||
handleTlsError(channelDescription, e);
|
||||
} else {
|
||||
logger.error("Failed to communicate over Channel {}", channelDescription, e);
|
||||
|
@ -194,6 +196,20 @@ public class ConnectionLoadBalanceServer {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean isTlsError(Throwable e) {
|
||||
if (e == null) {
|
||||
return false;
|
||||
} else {
|
||||
if (e instanceof CertificateException || e instanceof TlsException || e instanceof SSLException) {
|
||||
return true;
|
||||
} else if (e.getCause() != null) {
|
||||
return isTlsError(e.getCause());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines how to record the TLS-related error
|
||||
* ({@link org.apache.nifi.security.util.TlsException}, {@link SSLPeerUnverifiedException},
|
||||
|
|
|
@ -82,7 +82,6 @@ import java.util.stream.Collectors;
|
|||
*/
|
||||
public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncListener {
|
||||
static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
|
||||
private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
|
||||
private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles";
|
||||
private static final String FLOWFILE_REPO_CACHE_SIZE = "nifi.flowfile.repository.wal.cache.characters";
|
||||
|
||||
|
@ -161,7 +160,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty);
|
||||
|
||||
// determine the database file path and ensure it exists
|
||||
String writeAheadLogImpl = nifiProperties.getProperty(WRITE_AHEAD_LOG_IMPL);
|
||||
String writeAheadLogImpl = nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION);
|
||||
if (writeAheadLogImpl == null) {
|
||||
writeAheadLogImpl = DEFAULT_WAL_IMPLEMENTATION;
|
||||
}
|
||||
|
@ -212,8 +211,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
// TODO: May need to instantiate ESAWAL for clarity?
|
||||
wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPaths.get(0), serdeFactory, this);
|
||||
} else {
|
||||
throw new IllegalStateException("Cannot create Write-Ahead Log because the configured property '" + WRITE_AHEAD_LOG_IMPL + "' has an invalid value of '" + walImplementation
|
||||
+ "'. Please update nifi.properties to indicate a valid value for this property.");
|
||||
throw new IllegalStateException("Cannot create Write-Ahead Log because the configured property '" + NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION +
|
||||
"' has an invalid value of '" + walImplementation + "'. Please update nifi.properties to indicate a valid value for this property.");
|
||||
}
|
||||
|
||||
logger.info("Initialized FlowFile Repository");
|
||||
|
|
|
@ -21,8 +21,6 @@ import java.util.Map;
|
|||
import org.apache.nifi.controller.flow.FlowManager;
|
||||
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -30,7 +28,6 @@ import org.slf4j.LoggerFactory;
|
|||
* </p>
|
||||
*/
|
||||
public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
|
||||
protected final StatusHistoryRepository statusRepository;
|
||||
protected final FlowManager flowManager;
|
||||
protected final StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory;
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.wali;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.wali.SerDeFactory;
|
||||
import org.wali.SyncListener;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This implementation of {@link org.wali.WriteAheadRepository} is just a marker implementation wrapping the {@link SequentialAccessWriteAheadLog}. It exists to allow
|
||||
* users to configure {@code nifi.properties} with
|
||||
* {@code nifi.flowfile.repository.wal.implementation=org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog}
|
||||
* because the {@link org.wali.SerDe} interface is not exposed at that level. By selecting
|
||||
* this WAL implementation, the admin is enabling the encrypted flowfile repository, but all
|
||||
* other behavior is identical.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* This implementation transparently encrypts the objects as they are persisted to the journal file.
|
||||
* </p>
|
||||
*/
|
||||
public class EncryptedSequentialAccessWriteAheadLog<T> extends SequentialAccessWriteAheadLog<T> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EncryptedSequentialAccessWriteAheadLog.class);
|
||||
|
||||
|
||||
public EncryptedSequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory) throws IOException {
|
||||
this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER);
|
||||
}
|
||||
|
||||
public EncryptedSequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
|
||||
super(storageDirectory, serdeFactory, syncListener);
|
||||
}
|
||||
}
|
|
@ -27,7 +27,6 @@ import org.apache.nifi.controller.queue.QueueSize;
|
|||
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.FlowFileRecord;
|
||||
import org.apache.nifi.controller.repository.FlowFileRepository;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.processor.FlowFileFilter;
|
||||
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
|
@ -63,7 +62,6 @@ public class TestStandardFlowFileQueue {
|
|||
private Connection connection = null;
|
||||
private FlowFileRepository flowFileRepo = null;
|
||||
private ProvenanceEventRepository provRepo = null;
|
||||
private ResourceClaimManager claimManager = null;
|
||||
private ProcessScheduler scheduler = null;
|
||||
|
||||
private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
|
||||
|
@ -82,7 +80,6 @@ public class TestStandardFlowFileQueue {
|
|||
|
||||
flowFileRepo = Mockito.mock(FlowFileRepository.class);
|
||||
provRepo = Mockito.mock(ProvenanceEventRepository.class);
|
||||
claimManager = Mockito.mock(ResourceClaimManager.class);
|
||||
|
||||
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
|
||||
Mockito.doAnswer(new Answer<Object>() {
|
||||
|
@ -96,7 +93,7 @@ public class TestStandardFlowFileQueue {
|
|||
}
|
||||
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
|
||||
|
||||
queue = new StandardFlowFileQueue("id", flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
|
||||
queue = new StandardFlowFileQueue("id", flowFileRepo, provRepo, scheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
|
||||
MockFlowFileRecord.resetIdGenerator();
|
||||
}
|
||||
|
||||
|
@ -345,7 +342,7 @@ public class TestStandardFlowFileQueue {
|
|||
@Test
|
||||
public void testSwapInWhenThresholdIsLessThanSwapSize() {
|
||||
// create a queue where the swap threshold is less than 10k
|
||||
queue = new StandardFlowFileQueue("id", flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, "0 sec", 0L, "0 B");
|
||||
queue = new StandardFlowFileQueue("id", flowFileRepo, provRepo, scheduler, swapManager, null, 1000, "0 sec", 0L, "0 B");
|
||||
|
||||
for (int i = 1; i <= 20000; i++) {
|
||||
queue.put(new MockFlowFileRecord());
|
||||
|
|
|
@ -51,8 +51,6 @@ import org.apache.nifi.controller.repository.RepositoryRecord;
|
|||
import org.apache.nifi.controller.repository.RepositoryRecordType;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.provenance.ProvenanceRepository;
|
||||
import org.apache.nifi.security.util.SslContextFactory;
|
||||
|
@ -118,7 +116,6 @@ public class LoadBalancedQueueIT {
|
|||
private ClusterCoordinator clusterCoordinator;
|
||||
private NodeIdentifier localNodeId;
|
||||
private ProcessScheduler processScheduler;
|
||||
private ResourceClaimManager resourceClaimManager;
|
||||
private LoadBalancedFlowFileQueue serverQueue;
|
||||
private FlowController flowController;
|
||||
|
||||
|
@ -162,7 +159,6 @@ public class LoadBalancedQueueIT {
|
|||
|
||||
processScheduler = mock(ProcessScheduler.class);
|
||||
clientProvRepo = mock(ProvenanceRepository.class);
|
||||
resourceClaimManager = new StandardResourceClaimManager();
|
||||
final Connection connection = mock(Connection.class);
|
||||
when(connection.getIdentifier()).thenReturn(queueId);
|
||||
|
||||
|
@ -237,7 +233,7 @@ public class LoadBalancedQueueIT {
|
|||
final Thread clientThread = new Thread(clientTask);
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
|
@ -346,7 +342,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.setDaemon(true);
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
try {
|
||||
|
@ -445,7 +441,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
try {
|
||||
|
@ -534,7 +530,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
try {
|
||||
|
@ -606,7 +602,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
|
||||
|
||||
|
@ -697,7 +693,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
|
||||
|
||||
|
@ -786,7 +782,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
try {
|
||||
|
@ -874,7 +870,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
try {
|
||||
|
@ -962,7 +958,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
final byte[] payload = new byte[1024 * 1024];
|
||||
|
@ -1079,7 +1075,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new FlowFilePartitioner() {
|
||||
@Override
|
||||
public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) {
|
||||
|
@ -1185,7 +1181,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
try {
|
||||
|
@ -1254,7 +1250,7 @@ public class LoadBalancedQueueIT {
|
|||
clientThread.start();
|
||||
|
||||
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
|
||||
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
clientContentRepo, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
|
||||
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
try {
|
||||
|
|
|
@ -35,8 +35,6 @@ import org.apache.nifi.controller.repository.ContentRepository;
|
|||
import org.apache.nifi.controller.repository.FlowFileRecord;
|
||||
import org.apache.nifi.controller.repository.FlowFileRepository;
|
||||
import org.apache.nifi.controller.repository.SwapSummary;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
||||
|
@ -76,7 +74,6 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
|||
private FlowFileRepository flowFileRepo;
|
||||
private ContentRepository contentRepo;
|
||||
private ProvenanceEventRepository provRepo;
|
||||
private ResourceClaimManager claimManager;
|
||||
private ClusterCoordinator clusterCoordinator;
|
||||
private MockSwapManager swapManager;
|
||||
private EventReporter eventReporter;
|
||||
|
@ -95,7 +92,6 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
|||
flowFileRepo = mock(FlowFileRepository.class);
|
||||
contentRepo = mock(ContentRepository.class);
|
||||
provRepo = mock(ProvenanceEventRepository.class);
|
||||
claimManager = new StandardResourceClaimManager();
|
||||
clusterCoordinator = mock(ClusterCoordinator.class);
|
||||
swapManager = new MockSwapManager();
|
||||
eventReporter = EventReporter.NO_OP;
|
||||
|
@ -128,7 +124,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
|||
|
||||
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
|
||||
queue = new SocketLoadBalancedFlowFileQueue("unit-test", scheduler, flowFileRepo, provRepo,
|
||||
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
|
||||
contentRepo, clusterCoordinator, registry, swapManager, 10000, eventReporter);
|
||||
}
|
||||
|
||||
private NodeIdentifier createNodeIdentifier() {
|
||||
|
@ -223,7 +219,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
|||
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
|
||||
|
||||
queue = new SocketLoadBalancedFlowFileQueue("unit-test", scheduler, flowFileRepo, provRepo,
|
||||
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
|
||||
contentRepo, clusterCoordinator, registry, swapManager, 10000, eventReporter);
|
||||
queue.setPriorities(Collections.singletonList(iValuePrioritizer));
|
||||
|
||||
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
|
||||
|
@ -563,7 +559,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
|||
|
||||
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
|
||||
queue = new SocketLoadBalancedFlowFileQueue("unit-test", mock(ProcessScheduler.class), flowFileRepo, provRepo,
|
||||
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
|
||||
contentRepo, clusterCoordinator, registry, swapManager, 10000, eventReporter);
|
||||
|
||||
queue.setFlowFilePartitioner(new RoundRobinPartitioner());
|
||||
|
||||
|
|
|
@ -238,7 +238,7 @@ public class StandardProcessSessionIT {
|
|||
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
|
||||
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
|
||||
|
||||
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", flowFileRepo, provenanceRepo, null,
|
||||
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", flowFileRepo, provenanceRepo,
|
||||
processScheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
|
||||
return Mockito.spy(actualQueue);
|
||||
}
|
||||
|
|
|
@ -110,10 +110,6 @@ public class TestFileSystemRepository {
|
|||
try (final OutputStream out = repository.write(claim)) {
|
||||
out.write(content);
|
||||
}
|
||||
// final ContentClaim claim = cache.getContentClaim();
|
||||
// try (final OutputStream out = cache.write(claim)) {
|
||||
// out.write(content);
|
||||
// }
|
||||
}
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
|
||||
|
|
|
@ -537,7 +537,7 @@ public class TestWriteAheadFlowFileRepository {
|
|||
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
|
||||
|
||||
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
|
||||
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
|
||||
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
|
||||
|
||||
when(connection.getFlowFileQueue()).thenReturn(queue);
|
||||
queueProvider.addConnection(connection);
|
||||
|
|
|
@ -67,13 +67,6 @@ nifi.state.management.embedded.zookeeper.properties=${nifi.state.management.embe
|
|||
# Database Settings
|
||||
nifi.database.directory=${nifi.database.directory}
|
||||
|
||||
# Repository Encryption properties override individual repository implementation properties
|
||||
nifi.repository.encryption.protocol.version=
|
||||
nifi.repository.encryption.key.id=
|
||||
nifi.repository.encryption.key.provider=
|
||||
nifi.repository.encryption.key.provider.keystore.location=
|
||||
nifi.repository.encryption.key.provider.keystore.password=
|
||||
|
||||
# FlowFile Repository
|
||||
nifi.flowfile.repository.implementation=${nifi.flowfile.repository.implementation}
|
||||
nifi.flowfile.repository.wal.implementation=${nifi.flowfile.repository.wal.implementation}
|
||||
|
|
|
@ -853,8 +853,8 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
|
|||
|
||||
// if this nifi is a node in a cluster, start the flow service and load the flow - the
|
||||
// flow service is loaded here for clustered nodes because the loading of the flow will
|
||||
// initialize the connection between the node and the NCM. if the node connects (starts
|
||||
// heartbeating, etc), the NCM may issue web requests before the application (wars) have
|
||||
// initialize the connection between the node and the coordinator. if the node connects (starts
|
||||
// heartbeating, etc), the coordinator may issue web requests before the application (wars) have
|
||||
// finished loading. this results in the node being disconnected since its unable to
|
||||
// successfully respond to the requests. to resolve this, flow loading was moved to here
|
||||
// (after the wars have been successfully deployed) when this nifi instance is a node
|
||||
|
|
|
@ -169,7 +169,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
|
|||
throw new IllegalArgumentException("UI extension type must support Processor, ControllerService, or ReportingTask configuration.");
|
||||
}
|
||||
|
||||
// - when running standalone or cluster ncm - actions from custom UIs are stored locally
|
||||
// - when running standalone or cluster - actions from custom UIs are stored locally
|
||||
// - clustered nodes do not serve custom UIs directly to users so they should never be invoking this method
|
||||
final Date now = new Date();
|
||||
final Collection<Action> actions = new HashSet<>(configurationActions.size());
|
||||
|
|
|
@ -1370,7 +1370,7 @@ public class FlowResource extends ApplicationResource {
|
|||
|
||||
authorizeFlow();
|
||||
|
||||
// get the banner from the properties - will come from the NCM when clustered
|
||||
// get the banner from the properties
|
||||
final String bannerText = getProperties().getBannerText();
|
||||
|
||||
// create the DTO
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.web.util;
|
||||
|
||||
/**
|
||||
* Where a given controller service or reporting task should run.
|
||||
*/
|
||||
public enum Availability {
|
||||
|
||||
/**
|
||||
* Service or reporting task will run only on the NiFi Cluster Manager (NCM)
|
||||
*/
|
||||
NCM,
|
||||
/**
|
||||
* Service or reporting task will run only on NiFi Nodes (or standalone instance, if not clustered)
|
||||
*/
|
||||
NODE;
|
||||
}
|
|
@ -419,24 +419,6 @@ public class OcspCertificateValidator {
|
|||
// if the responder certificate was issued by the same CA that issued the subject certificate we may be able to use that...
|
||||
final X500Principal issuerCA = issuerCertificate.getSubjectX500Principal();
|
||||
if (responderCertificate.getIssuerX500Principal().equals(issuerCA)) {
|
||||
// perform a number of verification steps... TODO... from sun.security.provider.certpath.OCSPResponse.java... currently incomplete...
|
||||
// try {
|
||||
// // ensure appropriate key usage
|
||||
// final List<String> keyUsage = responderCertificate.getExtendedKeyUsage();
|
||||
// if (keyUsage == null || !keyUsage.contains(KP_OCSP_SIGNING_OID)) {
|
||||
// return null;
|
||||
// }
|
||||
//
|
||||
// // ensure the certificate is valid
|
||||
// responderCertificate.checkValidity();
|
||||
//
|
||||
// // verify the signature
|
||||
// responderCertificate.verify(issuerCertificate.getPublicKey());
|
||||
//
|
||||
// return responderCertificate;
|
||||
// } catch (final CertificateException | NoSuchAlgorithmException | InvalidKeyException | NoSuchProviderException | SignatureException e) {
|
||||
// return null;
|
||||
// }
|
||||
return null;
|
||||
} else {
|
||||
return null;
|
||||
|
|
|
@ -21,5 +21,4 @@ public class BundleTypeValues {
|
|||
public static final String NIFI_NAR_VALUE = "nifi-nar";
|
||||
public static final String MINIFI_CPP_VALUE = "minifi-cpp";
|
||||
|
||||
public static final String ALL_VALUES = NIFI_NAR_VALUE + ", " + MINIFI_CPP_VALUE;
|
||||
}
|
||||
|
|
|
@ -1049,7 +1049,6 @@ public class DatabaseMetadataService implements MetadataService {
|
|||
@Override
|
||||
public List<ExtensionEntity> getExtensionsByBundleVersionId(final String bundleVersionId) {
|
||||
final String selectSql = BASE_EXTENSION_SQL + " AND e.bundle_version_id = ?";
|
||||
final Object[] args = {bundleVersionId};
|
||||
return jdbcTemplate.query(selectSql, new ExtensionEntityRowMapper(), bundleVersionId);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,8 +35,6 @@ import org.apache.nifi.registry.security.authorization.util.AccessPolicyProvider
|
|||
import org.apache.nifi.registry.security.authorization.util.UserGroupProviderUtils;
|
||||
import org.apache.nifi.registry.security.exception.SecurityProviderCreationException;
|
||||
import org.apache.nifi.registry.security.identity.IdentityMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -48,8 +46,6 @@ import java.util.regex.Matcher;
|
|||
*/
|
||||
public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FileAuthorizer.class);
|
||||
|
||||
private static final String FILE_USER_GROUP_PROVIDER_ID = "file-user-group-provider";
|
||||
private static final String FILE_ACCESS_POLICY_PROVIDER_ID = "file-access-policy-provider";
|
||||
|
||||
|
|
|
@ -186,7 +186,6 @@ public class AuthorizationService {
|
|||
|
||||
private ResourcePermissions getTopLevelPermissions() {
|
||||
|
||||
NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
ResourcePermissions resourcePermissions = new ResourcePermissions();
|
||||
|
||||
final Permissions bucketsPermissions = getPermissionsForResource(authorizableLookup.getBucketsAuthorizable());
|
||||
|
@ -482,7 +481,6 @@ public class AuthorizationService {
|
|||
getAuthorizableResources(resourceType)
|
||||
.stream()
|
||||
.filter(resource -> {
|
||||
String resourceId = resource.getIdentifier();
|
||||
try {
|
||||
authorizableLookup
|
||||
.getAuthorizableByResource(resource.getIdentifier())
|
||||
|
|
|
@ -434,12 +434,6 @@ public class HtmlExtensionDocWriter implements ExtensionDocWriter {
|
|||
|
||||
writeValidValueDescription(xmlStreamWriter, group + "-" + artifact + "-" + version);
|
||||
|
||||
// xmlStreamWriter.writeEmptyElement("br");
|
||||
// xmlStreamWriter.writeCharacters(group);
|
||||
// xmlStreamWriter.writeEmptyElement("br");
|
||||
// xmlStreamWriter.writeCharacters(artifact);
|
||||
// xmlStreamWriter.writeEmptyElement("br");
|
||||
// xmlStreamWriter.writeCharacters(version);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -65,9 +65,6 @@ public class TestFlowContentSerializer {
|
|||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
serializer.serializeFlowContent(flowContent, out);
|
||||
|
||||
//final String json = new String(out.toByteArray(), StandardCharsets.UTF_8);
|
||||
//System.out.println(json);
|
||||
|
||||
final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
|
||||
|
||||
// make sure we can read the version from the input stream and it should be the current version
|
||||
|
|
|
@ -55,8 +55,6 @@ public class CryptoUtils {
|
|||
* null if max key length cannot be determined for any known Cipher transformations */
|
||||
public static Boolean isCryptoRestricted() {
|
||||
|
||||
Boolean isCryptoRestricted = null;
|
||||
|
||||
for (String transformation : standardCryptoTransformations) {
|
||||
try {
|
||||
return Cipher.getMaxAllowedKeyLength(transformation) < Integer.MAX_VALUE;
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
<artifactId>nifi-registry-core</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<groupId>org.apache.nifi.registry</groupId>
|
||||
<artifactId>nifi-registry-web-ui</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<packaging>war</packaging>
|
||||
|
|
|
@ -207,7 +207,6 @@ public class ComponentBuilder {
|
|||
|
||||
public ParameterProviderNode buildParameterProvider() throws ParameterProviderInstantiationException {
|
||||
final LoggableComponent<ParameterProvider> parameterProviderComponent = createLoggableParameterProvider();
|
||||
final ProcessScheduler processScheduler = statelessEngine.getProcessScheduler();
|
||||
final ControllerServiceProvider controllerServiceProvider = statelessEngine.getControllerServiceProvider();
|
||||
final ReloadComponent reloadComponent = statelessEngine.getReloadComponent();
|
||||
final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
|
||||
|
|
|
@ -87,7 +87,6 @@ public class FlowFileRestorationIT extends NiFiSystemIT {
|
|||
}
|
||||
|
||||
private byte[] getFlowFileContents(final String connectionId, final int flowFileIndex) throws IOException, NiFiClientException {
|
||||
final byte[] flowFileContents;
|
||||
try (final InputStream in = getClientUtil().getFlowFileContent(connectionId, flowFileIndex);
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
|
||||
|
|
|
@ -225,12 +225,6 @@ public abstract class AbstractCommand<R extends Result> implements Command<R> {
|
|||
return getContext().isInteractive();
|
||||
}
|
||||
|
||||
protected void printIfInteractive(final String val) {
|
||||
if (isInteractive()) {
|
||||
print(val);
|
||||
}
|
||||
}
|
||||
|
||||
protected void printlnIfInteractive(final String val) {
|
||||
if (isInteractive()) {
|
||||
println(val);
|
||||
|
|
|
@ -55,15 +55,6 @@ public class GetRegistryClientId extends AbstractNiFiCommand<RegistryClientIDRes
|
|||
public RegistryClientIDResult doExecute(final NiFiClient client, final Properties properties)
|
||||
throws NiFiClientException, IOException, CommandException {
|
||||
final String regClientName = getArg(properties, CommandOption.REGISTRY_CLIENT_NAME);
|
||||
//final String regClientUrl = getArg(properties, CommandOption.REGISTRY_CLIENT_URL);
|
||||
|
||||
// if (!StringUtils.isBlank(regClientName) && !StringUtils.isBlank(regClientUrl)) {
|
||||
// throw new CommandException("Name and URL cannot be specified at the same time");
|
||||
// }
|
||||
//
|
||||
// if (StringUtils.isBlank(regClientName) && StringUtils.isBlank(regClientUrl)) {
|
||||
// throw new CommandException("Name or URL must be specified");
|
||||
// }
|
||||
|
||||
final FlowRegistryClientsEntity registries = client.getControllerClient().getRegistryClients();
|
||||
|
||||
|
@ -78,7 +69,6 @@ public class GetRegistryClientId extends AbstractNiFiCommand<RegistryClientIDRes
|
|||
} else {
|
||||
registry = registries.getRegistries().stream()
|
||||
.map(r -> r.getComponent())
|
||||
// .filter(r -> r.getUri().equalsIgnoreCase(regClientUrl.trim()))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
|
|
@ -78,10 +78,6 @@ public class UpdateRegistryClient extends AbstractNiFiCommand<VoidResult> {
|
|||
existingRegClient.getComponent().setName(name);
|
||||
}
|
||||
|
||||
// if (StringUtils.isNotBlank(url)) {
|
||||
// existingRegClient.getComponent().setUri(url);
|
||||
// }
|
||||
|
||||
if (StringUtils.isNotBlank(desc)) {
|
||||
existingRegClient.getComponent().setDescription(desc);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue