mirror of https://github.com/apache/nifi.git
NIFI-7200: Revert "NIFI-6530 - HTTP SiteToSite server returns 201 in case no data is available"
This reverts commit f01668e66a
.
Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
parent
7b0ae56a2c
commit
afad982e91
|
@ -224,7 +224,7 @@ public class PeerSelector {
|
||||||
* for RECEIVE, a peer with more flow files is preferred
|
* for RECEIVE, a peer with more flow files is preferred
|
||||||
* @return a selected peer, if there is no available peer or all peers are penalized, then return null
|
* @return a selected peer, if there is no available peer or all peers are penalized, then return null
|
||||||
*/
|
*/
|
||||||
public ArrayList<PeerStatus> getPeerStatuses(final TransferDirection direction) {
|
public PeerStatus getNextPeerStatus(final TransferDirection direction) {
|
||||||
List<PeerStatus> peerList = peerStatuses;
|
List<PeerStatus> peerList = peerStatuses;
|
||||||
if (isPeerRefreshNeeded(peerList)) {
|
if (isPeerRefreshNeeded(peerList)) {
|
||||||
peerRefreshLock.lock();
|
peerRefreshLock.lock();
|
||||||
|
@ -251,15 +251,25 @@ public class PeerSelector {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (peerList == null || peerList.isEmpty()) {
|
if (peerList == null || peerList.isEmpty()) {
|
||||||
return new ArrayList<PeerStatus>();
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
ArrayList<PeerStatus> retVal = new ArrayList<>(peerList);
|
PeerStatus peerStatus;
|
||||||
retVal.removeIf(p -> isPenalized(p));
|
for (int i = 0; i < peerList.size(); i++) {
|
||||||
|
final long idx = peerIndex.getAndIncrement();
|
||||||
|
final int listIndex = (int) (idx % peerList.size());
|
||||||
|
peerStatus = peerList.get(listIndex);
|
||||||
|
|
||||||
return retVal;
|
if (isPenalized(peerStatus)) {
|
||||||
|
logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
|
||||||
|
} else {
|
||||||
|
return peerStatus;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug("{} All peers appear to be penalized; returning null", this);
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException {
|
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException {
|
||||||
|
|
|
@ -27,8 +27,6 @@ import org.apache.nifi.remote.client.PeerSelector;
|
||||||
import org.apache.nifi.remote.client.PeerStatusProvider;
|
import org.apache.nifi.remote.client.PeerStatusProvider;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
||||||
import org.apache.nifi.remote.exception.HandshakeException;
|
import org.apache.nifi.remote.exception.HandshakeException;
|
||||||
import org.apache.nifi.remote.exception.NoContentException;
|
|
||||||
import org.apache.nifi.remote.exception.NoValidPeerException;
|
|
||||||
import org.apache.nifi.remote.exception.PortNotRunningException;
|
import org.apache.nifi.remote.exception.PortNotRunningException;
|
||||||
import org.apache.nifi.remote.exception.ProtocolException;
|
import org.apache.nifi.remote.exception.ProtocolException;
|
||||||
import org.apache.nifi.remote.exception.UnknownPortException;
|
import org.apache.nifi.remote.exception.UnknownPortException;
|
||||||
|
@ -43,7 +41,6 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -128,11 +125,9 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
||||||
@Override
|
@Override
|
||||||
public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
|
public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
|
||||||
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
|
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
|
||||||
Integer peersWithNoContent = 0;
|
|
||||||
|
|
||||||
ArrayList<PeerStatus> peers = peerSelector.getPeerStatuses(direction);
|
PeerStatus peerStatus;
|
||||||
|
while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) {
|
||||||
for (PeerStatus peerStatus : peers) {
|
|
||||||
logger.debug("peerStatus={}", peerStatus);
|
logger.debug("peerStatus={}", peerStatus);
|
||||||
|
|
||||||
final CommunicationsSession commSession = new HttpCommunicationsSession();
|
final CommunicationsSession commSession = new HttpCommunicationsSession();
|
||||||
|
@ -174,11 +169,6 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
||||||
try {
|
try {
|
||||||
transactionUrl = apiClient.initiateTransaction(direction, portId);
|
transactionUrl = apiClient.initiateTransaction(direction, portId);
|
||||||
commSession.setUserDn(apiClient.getTrustedPeerDn());
|
commSession.setUserDn(apiClient.getTrustedPeerDn());
|
||||||
} catch (final NoContentException e) {
|
|
||||||
apiClient.close();
|
|
||||||
peersWithNoContent++;
|
|
||||||
logger.debug("Peer {} has no flowfiles to provide", peer);
|
|
||||||
continue;
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
apiClient.close();
|
apiClient.close();
|
||||||
logger.warn("Penalizing a peer {} due to {}", peer, e.toString());
|
logger.warn("Penalizing a peer {} due to {}", peer, e.toString());
|
||||||
|
@ -221,12 +211,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
||||||
return transaction;
|
return transaction;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(peersWithNoContent > 0) {
|
logger.info("Couldn't find a valid peer to communicate with.");
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
String error = new String("Couldn't find a valid peer to communicate with.");
|
|
||||||
logger.info(error);
|
|
||||||
throw new NoValidPeerException(error);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String resolveNodeApiUrl(final PeerDescription description) {
|
private String resolveNodeApiUrl(final PeerDescription description) {
|
||||||
|
|
|
@ -32,7 +32,6 @@ public class TransportProtocolVersionNegotiator extends StandardVersionNegotiato
|
||||||
public int getTransactionProtocolVersion() {
|
public int getTransactionProtocolVersion() {
|
||||||
switch (getVersion()) {
|
switch (getVersion()) {
|
||||||
case 1:
|
case 1:
|
||||||
case 2:
|
|
||||||
return 5;
|
return 5;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Transport protocol version " + getVersion()
|
throw new RuntimeException("Transport protocol version " + getVersion()
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.nifi.remote.client.SiteInfoProvider;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
||||||
import org.apache.nifi.remote.codec.FlowFileCodec;
|
import org.apache.nifi.remote.codec.FlowFileCodec;
|
||||||
import org.apache.nifi.remote.exception.HandshakeException;
|
import org.apache.nifi.remote.exception.HandshakeException;
|
||||||
import org.apache.nifi.remote.exception.NoValidPeerException;
|
|
||||||
import org.apache.nifi.remote.exception.PortNotRunningException;
|
import org.apache.nifi.remote.exception.PortNotRunningException;
|
||||||
import org.apache.nifi.remote.exception.TransmissionDisabledException;
|
import org.apache.nifi.remote.exception.TransmissionDisabledException;
|
||||||
import org.apache.nifi.remote.exception.UnknownPortException;
|
import org.apache.nifi.remote.exception.UnknownPortException;
|
||||||
|
@ -166,9 +165,14 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
||||||
throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe);
|
throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (PeerStatus peerStatus : peerSelector.getPeerStatuses(direction)) {
|
do {
|
||||||
final List<EndpointConnection> addBack = new ArrayList<>();
|
final List<EndpointConnection> addBack = new ArrayList<>();
|
||||||
|
logger.debug("{} getting next peer status", this);
|
||||||
|
final PeerStatus peerStatus = peerSelector.getNextPeerStatus(direction);
|
||||||
logger.debug("{} next peer status = {}", this, peerStatus);
|
logger.debug("{} next peer status = {}", this, peerStatus);
|
||||||
|
if (peerStatus == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
final PeerDescription peerDescription = peerStatus.getPeerDescription();
|
final PeerDescription peerDescription = peerStatus.getPeerDescription();
|
||||||
BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
|
BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
|
||||||
|
@ -188,7 +192,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
||||||
if (connection == null && !addBack.isEmpty()) {
|
if (connection == null && !addBack.isEmpty()) {
|
||||||
// all available connections have been penalized.
|
// all available connections have been penalized.
|
||||||
logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId);
|
logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId);
|
||||||
throw new NoValidPeerException("All peers are penalized");
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connection != null && connection.getPeer().isPenalized(portId)) {
|
if (connection != null && connection.getPeer().isPenalized(portId)) {
|
||||||
|
@ -314,13 +318,10 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if( connection != null && codec != null && commsSession != null && protocol != null) {
|
} while (connection == null || codec == null || commsSession == null || protocol == null);
|
||||||
activeConnections.add(connection);
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new NoValidPeerException("Didn't find any valid peer to connect to");
|
|
||||||
|
|
||||||
|
activeConnections.add(connection);
|
||||||
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean offer(final EndpointConnection endpointConnection) {
|
public boolean offer(final EndpointConnection endpointConnection) {
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.nifi.remote.TransactionCompletion;
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
|
import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
||||||
import org.apache.nifi.remote.exception.NoContentException;
|
|
||||||
import org.apache.nifi.remote.protocol.DataPacket;
|
import org.apache.nifi.remote.protocol.DataPacket;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -126,13 +125,14 @@ public class SocketClient extends AbstractSiteToSiteClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
|
final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
|
||||||
|
if (connectionState == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
final Transaction transaction;
|
final Transaction transaction;
|
||||||
try {
|
try {
|
||||||
transaction = connectionState.getSocketClientProtocol().startTransaction(
|
transaction = connectionState.getSocketClientProtocol().startTransaction(
|
||||||
connectionState.getPeer(), connectionState.getCodec(), direction);
|
connectionState.getPeer(), connectionState.getCodec(), direction);
|
||||||
} catch (final NoContentException e) {
|
|
||||||
return null;
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
pool.terminate(connectionState);
|
pool.terminate(connectionState);
|
||||||
throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);
|
throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);
|
||||||
|
|
|
@ -1,39 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.remote.exception;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A NoContentException occurs when the remote peer has no flowfiles to provide
|
|
||||||
*/
|
|
||||||
public class NoContentException extends IOException {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = -689032011082690815L;
|
|
||||||
|
|
||||||
public NoContentException(final String message, final Throwable cause) {
|
|
||||||
super(message, cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
public NoContentException(final String message) {
|
|
||||||
super(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public NoContentException(final Throwable cause) {
|
|
||||||
super(cause);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.remote.exception;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A NoValidPeerException occurs when all the remote peers are penalized or none exists
|
|
||||||
*/
|
|
||||||
public class NoValidPeerException extends IOException {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 8421102798129193880L;
|
|
||||||
|
|
||||||
public NoValidPeerException(final String message, final Throwable cause) {
|
|
||||||
super(message, cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
public NoValidPeerException(final String message) {
|
|
||||||
super(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public NoValidPeerException(final Throwable cause) {
|
|
||||||
super(cause);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -21,7 +21,6 @@ import org.apache.nifi.remote.AbstractTransaction;
|
||||||
import org.apache.nifi.remote.Peer;
|
import org.apache.nifi.remote.Peer;
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
import org.apache.nifi.remote.codec.FlowFileCodec;
|
import org.apache.nifi.remote.codec.FlowFileCodec;
|
||||||
import org.apache.nifi.remote.exception.NoContentException;
|
|
||||||
import org.apache.nifi.remote.exception.ProtocolException;
|
import org.apache.nifi.remote.exception.ProtocolException;
|
||||||
import org.apache.nifi.remote.protocol.RequestType;
|
import org.apache.nifi.remote.protocol.RequestType;
|
||||||
import org.apache.nifi.remote.protocol.Response;
|
import org.apache.nifi.remote.protocol.Response;
|
||||||
|
@ -44,9 +43,6 @@ public class SocketClientTransaction extends AbstractTransaction {
|
||||||
this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
|
this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
|
||||||
|
|
||||||
initialize();
|
initialize();
|
||||||
if (direction == TransferDirection.RECEIVE && !this.dataAvailable){
|
|
||||||
throw new NoContentException("Remote side has no flowfiles to provide");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initialize() throws IOException {
|
private void initialize() throws IOException {
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.apache.nifi.remote.Peer;
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
|
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
|
||||||
import org.apache.nifi.remote.exception.HandshakeException;
|
import org.apache.nifi.remote.exception.HandshakeException;
|
||||||
import org.apache.nifi.remote.exception.NoContentException;
|
|
||||||
import org.apache.nifi.remote.exception.PortNotRunningException;
|
import org.apache.nifi.remote.exception.PortNotRunningException;
|
||||||
import org.apache.nifi.remote.exception.ProtocolException;
|
import org.apache.nifi.remote.exception.ProtocolException;
|
||||||
import org.apache.nifi.remote.exception.UnknownPortException;
|
import org.apache.nifi.remote.exception.UnknownPortException;
|
||||||
|
@ -148,7 +147,6 @@ public class SiteToSiteRestApiClient implements Closeable {
|
||||||
private static final int RESPONSE_CODE_OK = 200;
|
private static final int RESPONSE_CODE_OK = 200;
|
||||||
private static final int RESPONSE_CODE_CREATED = 201;
|
private static final int RESPONSE_CODE_CREATED = 201;
|
||||||
private static final int RESPONSE_CODE_ACCEPTED = 202;
|
private static final int RESPONSE_CODE_ACCEPTED = 202;
|
||||||
private static final int RESPONSE_CODE_NO_CONTENT = 204;
|
|
||||||
private static final int RESPONSE_CODE_BAD_REQUEST = 400;
|
private static final int RESPONSE_CODE_BAD_REQUEST = 400;
|
||||||
private static final int RESPONSE_CODE_FORBIDDEN = 403;
|
private static final int RESPONSE_CODE_FORBIDDEN = 403;
|
||||||
private static final int RESPONSE_CODE_NOT_FOUND = 404;
|
private static final int RESPONSE_CODE_NOT_FOUND = 404;
|
||||||
|
@ -173,7 +171,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
||||||
private int batchCount = 0;
|
private int batchCount = 0;
|
||||||
private long batchSize = 0;
|
private long batchSize = 0;
|
||||||
private long batchDurationMillis = 0;
|
private long batchDurationMillis = 0;
|
||||||
private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(2,1);
|
private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
|
||||||
|
|
||||||
private String trustedPeerDn;
|
private String trustedPeerDn;
|
||||||
private final ScheduledExecutorService ttlExtendTaskExecutor;
|
private final ScheduledExecutorService ttlExtendTaskExecutor;
|
||||||
|
@ -500,8 +498,6 @@ public class SiteToSiteRestApiClient implements Closeable {
|
||||||
}
|
}
|
||||||
serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue());
|
serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue());
|
||||||
break;
|
break;
|
||||||
case RESPONSE_CODE_NO_CONTENT:
|
|
||||||
throw new NoContentException("Server has no flowfiles to provide");
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
try (InputStream content = response.getEntity().getContent()) {
|
try (InputStream content = response.getEntity().getContent()) {
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -50,6 +49,8 @@ import static java.util.stream.Collectors.groupingBy;
|
||||||
import static java.util.stream.Collectors.reducing;
|
import static java.util.stream.Collectors.reducing;
|
||||||
import static java.util.stream.Collectors.toMap;
|
import static java.util.stream.Collectors.toMap;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
@ -227,13 +228,10 @@ public class TestPeerSelector {
|
||||||
throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running.");
|
throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running.");
|
||||||
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
|
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
|
||||||
|
|
||||||
|
|
||||||
ArrayList<PeerStatus> peers;
|
|
||||||
|
|
||||||
// 1st attempt. It uses the bootstrap node.
|
// 1st attempt. It uses the bootstrap node.
|
||||||
peerSelector.refreshPeers();
|
peerSelector.refreshPeers();
|
||||||
peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
|
PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
|
||||||
assert(!peers.isEmpty());
|
assertNotNull(peerStatus);
|
||||||
|
|
||||||
// Proceed time so that peer selector refresh statuses.
|
// Proceed time so that peer selector refresh statuses.
|
||||||
peerStatuses.remove(bootstrapNodeStatus);
|
peerStatuses.remove(bootstrapNodeStatus);
|
||||||
|
@ -241,35 +239,34 @@ public class TestPeerSelector {
|
||||||
|
|
||||||
// 2nd attempt.
|
// 2nd attempt.
|
||||||
peerSelector.refreshPeers();
|
peerSelector.refreshPeers();
|
||||||
peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
|
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
|
||||||
assert(!peers.isEmpty());
|
assertNotNull(peerStatus);
|
||||||
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription());
|
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
|
||||||
|
|
||||||
// Proceed time so that peer selector refresh statuses.
|
// Proceed time so that peer selector refresh statuses.
|
||||||
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
|
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
|
||||||
|
|
||||||
// 3rd attempt.
|
// 3rd attempt.
|
||||||
peerSelector.refreshPeers();
|
peerSelector.refreshPeers();
|
||||||
peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
|
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
|
||||||
assert(!peers.isEmpty());
|
assertNotNull(peerStatus);
|
||||||
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription());
|
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
|
||||||
|
|
||||||
// Remove node2 to simulate that it goes down. There's no available node at this point.
|
// Remove node2 to simulate that it goes down. There's no available node at this point.
|
||||||
peerStatuses.remove(node2Status);
|
peerStatuses.remove(node2Status);
|
||||||
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
|
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
|
||||||
|
|
||||||
peerSelector.refreshPeers();
|
peerSelector.refreshPeers();
|
||||||
peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
|
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
|
||||||
assertTrue("PeerSelector should return an empty list as next peer statuses, since there's no available peer", peers.isEmpty());
|
assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus);
|
||||||
|
|
||||||
// Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node.
|
// Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node.
|
||||||
peerStatuses.add(bootstrapNodeStatus);
|
peerStatuses.add(bootstrapNodeStatus);
|
||||||
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
|
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
|
||||||
|
|
||||||
peerSelector.refreshPeers();
|
peerSelector.refreshPeers();
|
||||||
peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
|
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
|
||||||
assert(!peers.isEmpty());
|
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription());
|
||||||
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.nifi.remote.client.KeystoreType;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||||
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
|
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
|
||||||
import org.apache.nifi.remote.exception.HandshakeException;
|
import org.apache.nifi.remote.exception.HandshakeException;
|
||||||
import org.apache.nifi.remote.exception.NoValidPeerException;
|
|
||||||
import org.apache.nifi.remote.io.CompressionInputStream;
|
import org.apache.nifi.remote.io.CompressionInputStream;
|
||||||
import org.apache.nifi.remote.io.CompressionOutputStream;
|
import org.apache.nifi.remote.io.CompressionOutputStream;
|
||||||
import org.apache.nifi.remote.protocol.DataPacket;
|
import org.apache.nifi.remote.protocol.DataPacket;
|
||||||
|
@ -198,21 +197,6 @@ public class TestHttpClient {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class EmptyPortTransactionsServlet extends PortTransactionsServlet {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
|
|
||||||
|
|
||||||
final int reqProtocolVersion = getReqProtocolVersion(req);
|
|
||||||
if (reqProtocolVersion == 1) {
|
|
||||||
super.doPost(req, resp);
|
|
||||||
} else {
|
|
||||||
respondWithText(resp, "No flowfiles available", 204);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class PortTransactionsAccessDeniedServlet extends HttpServlet {
|
public static class PortTransactionsAccessDeniedServlet extends HttpServlet {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -513,8 +497,6 @@ public class TestHttpClient {
|
||||||
servletHandler.addServletWithMapping(OutputPortTransactionServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id");
|
servletHandler.addServletWithMapping(OutputPortTransactionServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id");
|
||||||
servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files");
|
servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files");
|
||||||
|
|
||||||
servletHandler.addServletWithMapping(EmptyPortTransactionsServlet.class,"/data-transfer/output-ports/empty-output-running-id/transactions");
|
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort());
|
logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort());
|
||||||
|
@ -674,13 +656,6 @@ public class TestHttpClient {
|
||||||
runningOutputPort.setState(ScheduledState.RUNNING.name());
|
runningOutputPort.setState(ScheduledState.RUNNING.name());
|
||||||
outputPorts.add(runningOutputPort);
|
outputPorts.add(runningOutputPort);
|
||||||
|
|
||||||
final PortDTO emptyRunningOutputPort = new PortDTO();
|
|
||||||
emptyRunningOutputPort.setName("empty-output-running");
|
|
||||||
emptyRunningOutputPort.setId("empty-output-running-id");
|
|
||||||
emptyRunningOutputPort.setType("OUTPUT_PORT");
|
|
||||||
emptyRunningOutputPort.setState(ScheduledState.RUNNING.name());
|
|
||||||
outputPorts.add(emptyRunningOutputPort);
|
|
||||||
|
|
||||||
final PortDTO timeoutOutputPort = new PortDTO();
|
final PortDTO timeoutOutputPort = new PortDTO();
|
||||||
timeoutOutputPort.setName("output-timeout");
|
timeoutOutputPort.setName("output-timeout");
|
||||||
timeoutOutputPort.setId("output-timeout-id");
|
timeoutOutputPort.setId("output-timeout-id");
|
||||||
|
@ -743,10 +718,9 @@ public class TestHttpClient {
|
||||||
.build()
|
.build()
|
||||||
) {
|
) {
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
||||||
fail();
|
|
||||||
|
|
||||||
} catch (final NoValidPeerException e) {
|
assertNull(transaction);
|
||||||
assertNotNull(e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -763,10 +737,9 @@ public class TestHttpClient {
|
||||||
.build()
|
.build()
|
||||||
) {
|
) {
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
||||||
fail();
|
|
||||||
|
|
||||||
} catch (final NoValidPeerException e) {
|
assertNull(transaction);
|
||||||
assertNotNull(e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -782,11 +755,11 @@ public class TestHttpClient {
|
||||||
.build()
|
.build()
|
||||||
) {
|
) {
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
||||||
fail();
|
|
||||||
|
|
||||||
} catch (final NoValidPeerException e) {
|
assertNull(transaction);
|
||||||
assertNotNull(e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -881,10 +854,7 @@ public class TestHttpClient {
|
||||||
.build()
|
.build()
|
||||||
) {
|
) {
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
|
||||||
fail();
|
assertNull("createTransaction should fail at peer selection and return null.", transaction);
|
||||||
|
|
||||||
} catch (final NoValidPeerException e) {
|
|
||||||
assertNotNull("createTransaction should fail at peer selection and return null.", e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1254,23 +1224,6 @@ public class TestHttpClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReceiveEmptyPort() throws Exception {
|
|
||||||
|
|
||||||
try (
|
|
||||||
SiteToSiteClient client = getDefaultBuilder()
|
|
||||||
.portName("empty-output-running")
|
|
||||||
.build()
|
|
||||||
) {
|
|
||||||
try {
|
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
|
||||||
assertNull(transaction);
|
|
||||||
} catch (IOException e) {
|
|
||||||
fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testReceive(SiteToSiteClient client) throws IOException {
|
private void testReceive(SiteToSiteClient client) throws IOException {
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.nifi.remote.Transaction;
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
import org.apache.nifi.remote.codec.FlowFileCodec;
|
import org.apache.nifi.remote.codec.FlowFileCodec;
|
||||||
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
|
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
|
||||||
import org.apache.nifi.remote.exception.NoContentException;
|
|
||||||
import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
|
import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
|
||||||
import org.apache.nifi.remote.io.socket.SocketInput;
|
import org.apache.nifi.remote.io.socket.SocketInput;
|
||||||
import org.apache.nifi.remote.io.socket.SocketOutput;
|
import org.apache.nifi.remote.io.socket.SocketOutput;
|
||||||
|
@ -45,6 +44,7 @@ import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPack
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
|
||||||
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile;
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
|
||||||
|
@ -52,7 +52,6 @@ import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithIn
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
|
||||||
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
|
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -93,12 +92,14 @@ public class TestSocketClientTransaction {
|
||||||
ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
|
ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
|
||||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
|
||||||
try {
|
SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE);
|
||||||
SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE);
|
|
||||||
fail();
|
execReceiveZeroFlowFile(transaction);
|
||||||
} catch (final NoContentException e) {
|
|
||||||
assertEquals("Remote side has no flowfiles to provide", e.getMessage());
|
// Verify what client has sent.
|
||||||
}
|
DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
|
||||||
|
assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient));
|
||||||
|
assertEquals(-1, sentByClient.read());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -147,13 +147,6 @@ public class NiFiReceiver extends Receiver<NiFiDataPacket> {
|
||||||
try {
|
try {
|
||||||
while (!isStopped()) {
|
while (!isStopped()) {
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
||||||
if (transaction == null) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000L);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
DataPacket dataPacket = transaction.receive();
|
DataPacket dataPacket = transaction.receive();
|
||||||
if (dataPacket == null) {
|
if (dataPacket == null) {
|
||||||
transaction.confirm();
|
transaction.confirm();
|
||||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
||||||
import org.apache.nifi.remote.exception.NoValidPeerException;
|
|
||||||
import org.apache.nifi.remote.exception.PortNotRunningException;
|
import org.apache.nifi.remote.exception.PortNotRunningException;
|
||||||
import org.apache.nifi.remote.exception.ProtocolException;
|
import org.apache.nifi.remote.exception.ProtocolException;
|
||||||
import org.apache.nifi.remote.exception.UnknownPortException;
|
import org.apache.nifi.remote.exception.UnknownPortException;
|
||||||
|
@ -223,13 +222,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
|
||||||
final Transaction transaction;
|
final Transaction transaction;
|
||||||
try {
|
try {
|
||||||
transaction = client.createTransaction(transferDirection);
|
transaction = client.createTransaction(transferDirection);
|
||||||
} catch (final NoValidPeerException e) {
|
|
||||||
final String message = String.format("%s Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
|
|
||||||
logger.debug(message);
|
|
||||||
session.rollback();
|
|
||||||
context.yield();
|
|
||||||
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
|
||||||
return;
|
|
||||||
} catch (final PortNotRunningException e) {
|
} catch (final PortNotRunningException e) {
|
||||||
context.yield();
|
context.yield();
|
||||||
this.targetRunning.set(false);
|
this.targetRunning.set(false);
|
||||||
|
@ -265,10 +257,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
|
||||||
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (transaction == null) {
|
if (transaction == null) {
|
||||||
|
logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
|
||||||
|
session.rollback();
|
||||||
context.yield();
|
context.yield();
|
||||||
final String message = String.format("%s successfully connected to %s, but it has no flowfiles to provide, yielding", this, url);
|
|
||||||
logger.debug(message);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.nifi.stateless.core;
|
package org.apache.nifi.stateless.core;
|
||||||
|
|
||||||
import org.apache.nifi.events.EventReporter;
|
import org.apache.nifi.events.EventReporter;
|
||||||
import org.apache.nifi.remote.exception.NoValidPeerException;
|
|
||||||
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
|
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processor.DataUnit;
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
@ -115,8 +114,8 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent {
|
||||||
try {
|
try {
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
||||||
if (transaction == null) {
|
if (transaction == null) {
|
||||||
getLogger().debug("No flowfiles to receive");
|
getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name});
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Queue<StatelessFlowFile> destinationQueue = new LinkedList<>();
|
final Queue<StatelessFlowFile> destinationQueue = new LinkedList<>();
|
||||||
|
@ -140,9 +139,6 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent {
|
||||||
|
|
||||||
transaction.confirm();
|
transaction.confirm();
|
||||||
transaction.complete();
|
transaction.complete();
|
||||||
} catch (final NoValidPeerException e) {
|
|
||||||
getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name});
|
|
||||||
return false;
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Failed to receive FlowFile via site-to-site", e);
|
getLogger().error("Failed to receive FlowFile via site-to-site", e);
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.nifi.authorization.PublicPortAuthorizable;
|
||||||
import org.apache.nifi.authorization.resource.ResourceType;
|
import org.apache.nifi.authorization.resource.ResourceType;
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
import org.apache.nifi.connectable.Connection;
|
|
||||||
import org.apache.nifi.remote.HttpRemoteSiteListener;
|
import org.apache.nifi.remote.HttpRemoteSiteListener;
|
||||||
import org.apache.nifi.remote.Peer;
|
import org.apache.nifi.remote.Peer;
|
||||||
import org.apache.nifi.remote.PeerDescription;
|
import org.apache.nifi.remote.PeerDescription;
|
||||||
|
@ -78,7 +77,6 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.List;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
|
||||||
|
@ -92,7 +90,6 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERT
|
||||||
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
|
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
|
||||||
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
|
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
|
||||||
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
|
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
|
||||||
import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RESTful endpoint for managing a SiteToSite connection.
|
* RESTful endpoint for managing a SiteToSite connection.
|
||||||
|
@ -208,21 +205,9 @@ public class DataTransferResource extends ApplicationResource {
|
||||||
final int transportProtocolVersion = validationResult.transportProtocolVersion;
|
final int transportProtocolVersion = validationResult.transportProtocolVersion;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion);
|
|
||||||
|
|
||||||
int protocolVersion = Integer.parseUnsignedInt(req.getHeader(PROTOCOL_VERSION));
|
|
||||||
|
|
||||||
if ((protocolVersion >= 2) && PORT_TYPE_OUTPUT.equals(portType)) {
|
|
||||||
List<Connection> connectionList = serverProtocol.getPort().getIncomingConnections();
|
|
||||||
if (connectionList.stream().allMatch(c -> c.getFlowFileQueue().isEmpty())) {
|
|
||||||
// Transaction could be created, but there is nothing to transfer. Just return 200.
|
|
||||||
logger.debug("Output port has no flowfiles to transfer, returning 200");
|
|
||||||
transactionManager.cancelTransaction(transactionId);
|
|
||||||
return noCache(Response.status(Response.Status.NO_CONTENT)).type(MediaType.TEXT_PLAIN).entity("No flowfiles available").build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute handshake.
|
// Execute handshake.
|
||||||
|
initiateServerProtocol(req, peer, transportProtocolVersion);
|
||||||
|
|
||||||
TransactionResultEntity entity = new TransactionResultEntity();
|
TransactionResultEntity entity = new TransactionResultEntity();
|
||||||
entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
|
entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
|
||||||
entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId);
|
entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId);
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.commons.cli.ParseException;
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
import org.apache.nifi.remote.client.KeystoreType;
|
import org.apache.nifi.remote.client.KeystoreType;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||||
import org.apache.nifi.remote.exception.NoContentException;
|
|
||||||
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
||||||
import org.apache.nifi.remote.protocol.http.HttpProxy;
|
import org.apache.nifi.remote.protocol.http.HttpProxy;
|
||||||
import org.apache.nifi.util.FormatUtils;
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
@ -242,8 +241,6 @@ public class SiteToSiteCliMain {
|
||||||
} else {
|
} else {
|
||||||
new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles();
|
new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles();
|
||||||
}
|
}
|
||||||
} catch (final NoContentException e) {
|
|
||||||
System.out.println("Remote port has no flowfiles");
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
printUsage(e.getMessage(), options);
|
printUsage(e.getMessage(), options);
|
||||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.nifi.remote.Transaction;
|
||||||
import org.apache.nifi.remote.TransactionCompletion;
|
import org.apache.nifi.remote.TransactionCompletion;
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
import org.apache.nifi.remote.TransferDirection;
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||||
import org.apache.nifi.remote.exception.NoContentException;
|
|
||||||
import org.apache.nifi.remote.protocol.DataPacket;
|
import org.apache.nifi.remote.protocol.DataPacket;
|
||||||
import com.fasterxml.jackson.core.JsonFactory;
|
import com.fasterxml.jackson.core.JsonFactory;
|
||||||
import com.fasterxml.jackson.core.JsonGenerator;
|
import com.fasterxml.jackson.core.JsonGenerator;
|
||||||
|
@ -46,9 +45,6 @@ public class SiteToSiteReceiver {
|
||||||
|
|
||||||
public TransactionCompletion receiveFiles() throws IOException {
|
public TransactionCompletion receiveFiles() throws IOException {
|
||||||
Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
|
Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
|
||||||
if (transaction == null) {
|
|
||||||
throw new NoContentException("Remote side has no flowfiles to provide");
|
|
||||||
}
|
|
||||||
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output);
|
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output);
|
||||||
jsonGenerator.writeStartArray();
|
jsonGenerator.writeStartArray();
|
||||||
DataPacket dataPacket;
|
DataPacket dataPacket;
|
||||||
|
|
Loading…
Reference in New Issue