From f01668e66ad2e45197915769e966a4be27e1592e Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Mon, 12 Aug 2019 17:30:30 +0200 Subject: [PATCH] NIFI-6530 - HTTP SiteToSite server returns 201 in case no data is available This closes #3647. Signed-off-by: Koji Kawamura --- .../nifi/remote/client/PeerSelector.java | 22 ++----- .../nifi/remote/client/http/HttpClient.java | 22 +++++-- .../TransportProtocolVersionNegotiator.java | 1 + .../client/socket/EndpointConnectionPool.java | 19 +++--- .../remote/client/socket/SocketClient.java | 6 +- .../remote/exception/NoContentException.java | 39 ++++++++++++ .../exception/NoValidPeerException.java | 40 ++++++++++++ .../socket/SocketClientTransaction.java | 4 ++ .../remote/util/SiteToSiteRestApiClient.java | 6 +- .../nifi/remote/client/TestPeerSelector.java | 31 ++++----- .../remote/client/http/TestHttpClient.java | 63 ++++++++++++++++--- .../socket/TestSocketClientTransaction.java | 17 +++-- .../org/apache/nifi/spark/NiFiReceiver.java | 7 +++ .../nifi/remote/StandardRemoteGroupPort.java | 15 +++-- .../nifi/web/api/DataTransferResource.java | 19 +++++- .../core/StatelessRemoteOutputPort.java | 8 ++- .../nifi/toolkit/s2s/SiteToSiteCliMain.java | 3 + .../nifi/toolkit/s2s/SiteToSiteReceiver.java | 4 ++ 18 files changed, 253 insertions(+), 73 deletions(-) create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index a4439673d2..14c163bace 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -262,7 +262,7 @@ public class PeerSelector { * for RECEIVE, a peer with more flow files is preferred * @return a selected peer, if there is no available peer or all peers are penalized, then return null */ - public PeerStatus getNextPeerStatus(final TransferDirection direction) { + public ArrayList getPeerStatuses(final TransferDirection direction) { List peerList = peerStatuses; if (isPeerRefreshNeeded(peerList)) { peerRefreshLock.lock(); @@ -289,25 +289,15 @@ public class PeerSelector { } } + if (peerList == null || peerList.isEmpty()) { - return null; + return new ArrayList(); } - PeerStatus peerStatus; - for (int i = 0; i < peerList.size(); i++) { - final long idx = peerIndex.getAndIncrement(); - final int listIndex = (int) (idx % peerList.size()); - peerStatus = peerList.get(listIndex); + ArrayList retVal = new ArrayList<>(peerList); + retVal.removeIf(p -> isPenalized(p)); - if (isPenalized(peerStatus)) { - logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); - } else { - return peerStatus; - } - } - - logger.debug("{} All peers appear to be penalized; returning null", this); - return null; + return retVal; } private List createPeerStatusList(final TransferDirection direction) throws IOException { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index 4213dac67a..e1516d2923 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -27,6 +27,8 @@ import org.apache.nifi.remote.client.PeerSelector; import org.apache.nifi.remote.client.PeerStatusProvider; import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.NoContentException; +import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -124,9 +127,11 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr @Override public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException { final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); + Integer peersWithNoContent = 0; - PeerStatus peerStatus; - while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) { + ArrayList peers = peerSelector.getPeerStatuses(direction); + + for (PeerStatus peerStatus : peers) { logger.debug("peerStatus={}", peerStatus); final CommunicationsSession commSession = new HttpCommunicationsSession(); @@ -168,6 +173,11 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr try { transactionUrl = apiClient.initiateTransaction(direction, portId); commSession.setUserDn(apiClient.getTrustedPeerDn()); + } catch (final NoContentException e) { + apiClient.close(); + peersWithNoContent++; + logger.debug("Peer {} has no flowfiles to provide", peer); + continue; } catch (final Exception e) { apiClient.close(); logger.warn("Penalizing a peer {} due to {}", peer, e.toString()); @@ -210,8 +220,12 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr return transaction; } - logger.info("Couldn't find a valid peer to communicate with."); - return null; + if(peersWithNoContent > 0) { + return null; + } + String error = new String("Couldn't find a valid peer to communicate with."); + logger.info(error); + throw new NoValidPeerException(error); } private String resolveNodeApiUrl(final PeerDescription description) { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java index d0a636871b..844a92e9a7 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java @@ -32,6 +32,7 @@ public class TransportProtocolVersionNegotiator extends StandardVersionNegotiato public int getTransactionProtocolVersion() { switch (getVersion()) { case 1: + case 2: return 5; default: throw new RuntimeException("Transport protocol version " + getVersion() diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index a360c873de..e9f2536a1e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -29,6 +29,7 @@ import org.apache.nifi.remote.client.SiteInfoProvider; import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.TransmissionDisabledException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -165,14 +166,9 @@ public class EndpointConnectionPool implements PeerStatusProvider { throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe); } - do { + for (PeerStatus peerStatus : peerSelector.getPeerStatuses(direction)) { final List addBack = new ArrayList<>(); - logger.debug("{} getting next peer status", this); - final PeerStatus peerStatus = peerSelector.getNextPeerStatus(direction); logger.debug("{} next peer status = {}", this, peerStatus); - if (peerStatus == null) { - return null; - } final PeerDescription peerDescription = peerStatus.getPeerDescription(); BlockingQueue connectionQueue = connectionQueueMap.get(peerDescription); @@ -192,7 +188,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { if (connection == null && !addBack.isEmpty()) { // all available connections have been penalized. logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId); - return null; + throw new NoValidPeerException("All peers are penalized"); } if (connection != null && connection.getPeer().isPenalized(portId)) { @@ -318,10 +314,13 @@ public class EndpointConnectionPool implements PeerStatusProvider { } } - } while (connection == null || codec == null || commsSession == null || protocol == null); + if( connection != null && codec != null && commsSession != null && protocol != null) { + activeConnections.add(connection); + return connection; + } + } + throw new NoValidPeerException("Didn't find any valid peer to connect to"); - activeConnections.add(connection); - return connection; } public boolean offer(final EndpointConnection endpointConnection) { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index ba6839ce43..0999d57fb4 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -23,6 +23,7 @@ import org.apache.nifi.remote.TransactionCompletion; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.AbstractSiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.protocol.DataPacket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,14 +126,13 @@ public class SocketClient extends AbstractSiteToSiteClient { } final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig()); - if (connectionState == null) { - return null; - } final Transaction transaction; try { transaction = connectionState.getSocketClientProtocol().startTransaction( connectionState.getPeer(), connectionState.getCodec(), direction); + } catch (final NoContentException e) { + return null; } catch (final Throwable t) { pool.terminate(connectionState); throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java new file mode 100644 index 0000000000..a0dd23d03a --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +import java.io.IOException; + +/** + * A NoContentException occurs when the remote peer has no flowfiles to provide + */ +public class NoContentException extends IOException { + + private static final long serialVersionUID = -689032011082690815L; + + public NoContentException(final String message, final Throwable cause) { + super(message, cause); + } + + public NoContentException(final String message) { + super(message); + } + + public NoContentException(final Throwable cause) { + super(cause); + } +} \ No newline at end of file diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java new file mode 100644 index 0000000000..30a51a0ca6 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +import java.io.IOException; + + +/** + * A NoValidPeerException occurs when all the remote peers are penalized or none exists + */ +public class NoValidPeerException extends IOException { + + private static final long serialVersionUID = 8421102798129193880L; + + public NoValidPeerException(final String message, final Throwable cause) { + super(message, cause); + } + + public NoValidPeerException(final String message) { + super(message); + } + + public NoValidPeerException(final Throwable cause) { + super(cause); + } +} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index e29f045f39..5b1eb1cd13 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -21,6 +21,7 @@ import org.apache.nifi.remote.AbstractTransaction; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.Response; @@ -43,6 +44,9 @@ public class SocketClientTransaction extends AbstractTransaction { this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); initialize(); + if (direction == TransferDirection.RECEIVE && !this.dataAvailable){ + throw new NoContentException("Remote side has no flowfiles to provide"); + } } private void initialize() throws IOException { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 249325d44b..32707087cb 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -64,6 +64,7 @@ import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -147,6 +148,7 @@ public class SiteToSiteRestApiClient implements Closeable { private static final int RESPONSE_CODE_OK = 200; private static final int RESPONSE_CODE_CREATED = 201; private static final int RESPONSE_CODE_ACCEPTED = 202; + private static final int RESPONSE_CODE_NO_CONTENT = 204; private static final int RESPONSE_CODE_BAD_REQUEST = 400; private static final int RESPONSE_CODE_FORBIDDEN = 403; private static final int RESPONSE_CODE_NOT_FOUND = 404; @@ -171,7 +173,7 @@ public class SiteToSiteRestApiClient implements Closeable { private int batchCount = 0; private long batchSize = 0; private long batchDurationMillis = 0; - private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); + private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(2,1); private String trustedPeerDn; private final ScheduledExecutorService ttlExtendTaskExecutor; @@ -498,6 +500,8 @@ public class SiteToSiteRestApiClient implements Closeable { } serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue()); break; + case RESPONSE_CODE_NO_CONTENT: + throw new NoContentException("Server has no flowfiles to provide"); default: try (InputStream content = response.getEntity().getContent()) { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java index 6b6db3cd27..e29efd85de 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -36,8 +37,6 @@ import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.reducing; import static java.util.stream.Collectors.toMap; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -212,10 +211,13 @@ public class TestPeerSelector { throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running."); }).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class)); + + ArrayList peers; + // 1st attempt. It uses the bootstrap node. peerSelector.refreshPeers(); - PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); - assertNotNull(peerStatus); + peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); + assert(!peers.isEmpty()); // Proceed time so that peer selector refresh statuses. peerStatuses.remove(bootstrapNodeStatus); @@ -223,33 +225,34 @@ public class TestPeerSelector { // 2nd attempt. peerSelector.refreshPeers(); - peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); - assertNotNull(peerStatus); - assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); + peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); + assert(!peers.isEmpty()); + assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription()); // Proceed time so that peer selector refresh statuses. systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; // 3rd attempt. peerSelector.refreshPeers(); - peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); - assertNotNull(peerStatus); - assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); + peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); + assert(!peers.isEmpty()); + assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription()); // Remove node2 to simulate that it goes down. There's no available node at this point. peerStatuses.remove(node2Status); systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; peerSelector.refreshPeers(); - peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); - assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus); + peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); + assertTrue("PeerSelector should return an empty list as next peer statuses, since there's no available peer", peers.isEmpty()); // Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node. peerStatuses.add(bootstrapNodeStatus); systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; peerSelector.refreshPeers(); - peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); - assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription()); + peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); + assert(!peers.isEmpty()); + assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription()); } } diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java index ded1db194b..706b4ca299 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java @@ -57,6 +57,7 @@ import org.apache.nifi.remote.client.KeystoreType; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.DataPacket; @@ -197,6 +198,21 @@ public class TestHttpClient { } + public static class EmptyPortTransactionsServlet extends PortTransactionsServlet { + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + + final int reqProtocolVersion = getReqProtocolVersion(req); + if (reqProtocolVersion == 1) { + super.doPost(req, resp); + } else { + respondWithText(resp, "No flowfiles available", 204); + } + } + + } + public static class PortTransactionsAccessDeniedServlet extends HttpServlet { @Override @@ -497,6 +513,8 @@ public class TestHttpClient { servletHandler.addServletWithMapping(OutputPortTransactionServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id"); servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files"); + servletHandler.addServletWithMapping(EmptyPortTransactionsServlet.class,"/data-transfer/output-ports/empty-output-running-id/transactions"); + server.start(); logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort()); @@ -656,6 +674,13 @@ public class TestHttpClient { runningOutputPort.setState(ScheduledState.RUNNING.name()); outputPorts.add(runningOutputPort); + final PortDTO emptyRunningOutputPort = new PortDTO(); + emptyRunningOutputPort.setName("empty-output-running"); + emptyRunningOutputPort.setId("empty-output-running-id"); + emptyRunningOutputPort.setType("OUTPUT_PORT"); + emptyRunningOutputPort.setState(ScheduledState.RUNNING.name()); + outputPorts.add(emptyRunningOutputPort); + final PortDTO timeoutOutputPort = new PortDTO(); timeoutOutputPort.setName("output-timeout"); timeoutOutputPort.setId("output-timeout-id"); @@ -718,9 +743,10 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); + fail(); - assertNull(transaction); - + } catch (final NoValidPeerException e) { + assertNotNull(e.getMessage()); } } @@ -737,9 +763,10 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); + fail(); - assertNull(transaction); - + } catch (final NoValidPeerException e) { + assertNotNull(e.getMessage()); } } @@ -755,11 +782,11 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); + fail(); - assertNull(transaction); - + } catch (final NoValidPeerException e) { + assertNotNull(e.getMessage()); } - } @Test @@ -854,7 +881,10 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); - assertNull("createTransaction should fail at peer selection and return null.", transaction); + fail(); + + } catch (final NoValidPeerException e) { + assertNotNull("createTransaction should fail at peer selection and return null.", e.getMessage()); } } @@ -1224,6 +1254,23 @@ public class TestHttpClient { } } + @Test + public void testReceiveEmptyPort() throws Exception { + + try ( + SiteToSiteClient client = getDefaultBuilder() + .portName("empty-output-running") + .build() + ) { + try { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + assertNull(transaction); + } catch (IOException e) { + fail(); + } + } + } + private void testReceive(SiteToSiteClient client) throws IOException { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java index a327313818..ec86dff2e5 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java @@ -20,7 +20,6 @@ import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPack import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum; -import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles; @@ -43,6 +42,7 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec; +import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketChannelInput; import org.apache.nifi.remote.io.socket.SocketChannelOutput; @@ -51,6 +51,7 @@ import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.Response; import org.apache.nifi.remote.protocol.ResponseCode; import org.junit.Test; +import static org.junit.Assert.fail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,14 +92,12 @@ public class TestSocketClientTransaction { ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray()); ByteArrayOutputStream bos = new ByteArrayOutputStream(); - SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE); - - execReceiveZeroFlowFile(transaction); - - // Verify what client has sent. - DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); - assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient)); - assertEquals(-1, sentByClient.read()); + try { + SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE); + fail(); + } catch (final NoContentException e) { + assertEquals("Remote side has no flowfiles to provide", e.getMessage()); + } } @Test diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java index 83a7e42ed7..278e6b68a6 100644 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java +++ b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java @@ -147,6 +147,13 @@ public class NiFiReceiver extends Receiver { try { while (!isStopped()) { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + if (transaction == null) { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + } + continue; + } DataPacket dataPacket = transaction.receive(); if (dataPacket == null) { transaction.confirm(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index e51ea37558..3b4d6308d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -36,6 +36,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -232,9 +233,16 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } final SiteToSiteClient client = getSiteToSiteClient(); - final Transaction transaction; + Transaction transaction = null; try { transaction = client.createTransaction(transferDirection); + } catch (final NoValidPeerException e) { + final String message = String.format("%s Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this); + logger.debug(message); + session.rollback(); + context.yield(); + remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); + return; } catch (final PortNotRunningException e) { context.yield(); this.targetRunning.set(false); @@ -270,11 +278,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } - if (transaction == null) { - logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this); - session.rollback(); context.yield(); + final String message = String.format("%s successfully connected to %s, but it has no flowfiles to provide, yielding", this, url); + logger.debug(message); return; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java index c8787d3a07..f74d66edc5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java @@ -31,6 +31,7 @@ import org.apache.nifi.authorization.PublicPortAuthorizable; import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerDescription; @@ -77,6 +78,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import java.net.InetAddress; import java.net.UnknownHostException; @@ -90,6 +92,7 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERT import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION; /** * RESTful endpoint for managing a SiteToSite connection. @@ -205,9 +208,21 @@ public class DataTransferResource extends ApplicationResource { final int transportProtocolVersion = validationResult.transportProtocolVersion; try { - // Execute handshake. - initiateServerProtocol(req, peer, transportProtocolVersion); + HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion); + int protocolVersion = Integer.parseUnsignedInt(req.getHeader(PROTOCOL_VERSION)); + + if ((protocolVersion >= 2) && PORT_TYPE_OUTPUT.equals(portType)) { + List connectionList = serverProtocol.getPort().getIncomingConnections(); + if (connectionList.stream().allMatch(c -> c.getFlowFileQueue().isEmpty())) { + // Transaction could be created, but there is nothing to transfer. Just return 200. + logger.debug("Output port has no flowfiles to transfer, returning 200"); + transactionManager.cancelTransaction(transactionId); + return noCache(Response.status(Response.Status.NO_CONTENT)).type(MediaType.TEXT_PLAIN).entity("No flowfiles available").build(); + } + } + + // Execute handshake. TransactionResultEntity entity = new TransactionResultEntity(); entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode()); entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId); diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java index cd82fe7ebd..054370643c 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java @@ -17,6 +17,7 @@ package org.apache.nifi.stateless.core; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; @@ -114,8 +115,8 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent { try { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); if (transaction == null) { - getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name}); - return false; + getLogger().debug("No flowfiles to receive"); + return true; } final Queue destinationQueue = new LinkedList<>(); @@ -139,6 +140,9 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent { transaction.confirm(); transaction.complete(); + } catch (final NoValidPeerException e) { + getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name}); + return false; } catch (final Exception e) { getLogger().error("Failed to receive FlowFile via site-to-site", e); return false; diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java index e57dbbd544..00bdaf8968 100644 --- a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java @@ -30,6 +30,7 @@ import org.apache.commons.cli.ParseException; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.KeystoreType; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.util.FormatUtils; @@ -241,6 +242,8 @@ public class SiteToSiteCliMain { } else { new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles(); } + } catch (final NoContentException e) { + System.out.println("Remote port has no flowfiles"); } } catch (Exception e) { printUsage(e.getMessage(), options); diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java index 88ad8f3781..82d19d207a 100644 --- a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java @@ -22,6 +22,7 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransactionCompletion; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.protocol.DataPacket; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -45,6 +46,9 @@ public class SiteToSiteReceiver { public TransactionCompletion receiveFiles() throws IOException { Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE); + if (transaction == null) { + throw new NoContentException("Remote side has no flowfiles to provide"); + } JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output); jsonGenerator.writeStartArray(); DataPacket dataPacket;