From afad982e91debd1109a6ec6d1865a77e8b3470ee Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 10 Mar 2020 11:56:49 -0400 Subject: [PATCH] NIFI-7200: Revert "NIFI-6530 - HTTP SiteToSite server returns 201 in case no data is available" This reverts commit f01668e66ad2e45197915769e966a4be27e1592e. Signed-off-by: Joe Witt --- .../nifi/remote/client/PeerSelector.java | 22 +++++-- .../nifi/remote/client/http/HttpClient.java | 22 ++----- .../TransportProtocolVersionNegotiator.java | 1 - .../client/socket/EndpointConnectionPool.java | 19 +++--- .../remote/client/socket/SocketClient.java | 6 +- .../remote/exception/NoContentException.java | 39 ------------ .../exception/NoValidPeerException.java | 40 ------------ .../socket/SocketClientTransaction.java | 4 -- .../remote/util/SiteToSiteRestApiClient.java | 6 +- .../nifi/remote/client/TestPeerSelector.java | 31 +++++---- .../remote/client/http/TestHttpClient.java | 63 +++---------------- .../socket/TestSocketClientTransaction.java | 17 ++--- .../org/apache/nifi/spark/NiFiReceiver.java | 7 --- .../nifi/remote/StandardRemoteGroupPort.java | 13 +--- .../core/StatelessRemoteOutputPort.java | 8 +-- .../nifi/web/api/DataTransferResource.java | 19 +----- .../nifi/toolkit/s2s/SiteToSiteCliMain.java | 3 - .../nifi/toolkit/s2s/SiteToSiteReceiver.java | 4 -- 18 files changed, 72 insertions(+), 252 deletions(-) delete mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java delete mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index 8235a38a7a..0a61077b9d 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -224,7 +224,7 @@ public class PeerSelector { * for RECEIVE, a peer with more flow files is preferred * @return a selected peer, if there is no available peer or all peers are penalized, then return null */ - public ArrayList getPeerStatuses(final TransferDirection direction) { + public PeerStatus getNextPeerStatus(final TransferDirection direction) { List peerList = peerStatuses; if (isPeerRefreshNeeded(peerList)) { peerRefreshLock.lock(); @@ -251,15 +251,25 @@ public class PeerSelector { } } - if (peerList == null || peerList.isEmpty()) { - return new ArrayList(); + return null; } - ArrayList retVal = new ArrayList<>(peerList); - retVal.removeIf(p -> isPenalized(p)); + PeerStatus peerStatus; + for (int i = 0; i < peerList.size(); i++) { + final long idx = peerIndex.getAndIncrement(); + final int listIndex = (int) (idx % peerList.size()); + peerStatus = peerList.get(listIndex); - return retVal; + if (isPenalized(peerStatus)) { + logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); + } else { + return peerStatus; + } + } + + logger.debug("{} All peers appear to be penalized; returning null", this); + return null; } private List createPeerStatusList(final TransferDirection direction) throws IOException { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index 690cdfd6a9..660f5ea4ad 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -27,8 +27,6 @@ import org.apache.nifi.remote.client.PeerSelector; import org.apache.nifi.remote.client.PeerStatusProvider; import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.NoContentException; -import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -43,7 +41,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -128,11 +125,9 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr @Override public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException { final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); - Integer peersWithNoContent = 0; - ArrayList peers = peerSelector.getPeerStatuses(direction); - - for (PeerStatus peerStatus : peers) { + PeerStatus peerStatus; + while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) { logger.debug("peerStatus={}", peerStatus); final CommunicationsSession commSession = new HttpCommunicationsSession(); @@ -174,11 +169,6 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr try { transactionUrl = apiClient.initiateTransaction(direction, portId); commSession.setUserDn(apiClient.getTrustedPeerDn()); - } catch (final NoContentException e) { - apiClient.close(); - peersWithNoContent++; - logger.debug("Peer {} has no flowfiles to provide", peer); - continue; } catch (final Exception e) { apiClient.close(); logger.warn("Penalizing a peer {} due to {}", peer, e.toString()); @@ -221,12 +211,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr return transaction; } - if(peersWithNoContent > 0) { - return null; - } - String error = new String("Couldn't find a valid peer to communicate with."); - logger.info(error); - throw new NoValidPeerException(error); + logger.info("Couldn't find a valid peer to communicate with."); + return null; } private String resolveNodeApiUrl(final PeerDescription description) { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java index 844a92e9a7..d0a636871b 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java @@ -32,7 +32,6 @@ public class TransportProtocolVersionNegotiator extends StandardVersionNegotiato public int getTransactionProtocolVersion() { switch (getVersion()) { case 1: - case 2: return 5; default: throw new RuntimeException("Transport protocol version " + getVersion() diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 53bd963e18..0cf1b5328a 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -30,7 +30,6 @@ import org.apache.nifi.remote.client.SiteInfoProvider; import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.TransmissionDisabledException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -166,9 +165,14 @@ public class EndpointConnectionPool implements PeerStatusProvider { throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe); } - for (PeerStatus peerStatus : peerSelector.getPeerStatuses(direction)) { + do { final List addBack = new ArrayList<>(); + logger.debug("{} getting next peer status", this); + final PeerStatus peerStatus = peerSelector.getNextPeerStatus(direction); logger.debug("{} next peer status = {}", this, peerStatus); + if (peerStatus == null) { + return null; + } final PeerDescription peerDescription = peerStatus.getPeerDescription(); BlockingQueue connectionQueue = connectionQueueMap.get(peerDescription); @@ -188,7 +192,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { if (connection == null && !addBack.isEmpty()) { // all available connections have been penalized. logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId); - throw new NoValidPeerException("All peers are penalized"); + return null; } if (connection != null && connection.getPeer().isPenalized(portId)) { @@ -314,13 +318,10 @@ public class EndpointConnectionPool implements PeerStatusProvider { } } - if( connection != null && codec != null && commsSession != null && protocol != null) { - activeConnections.add(connection); - return connection; - } - } - throw new NoValidPeerException("Didn't find any valid peer to connect to"); + } while (connection == null || codec == null || commsSession == null || protocol == null); + activeConnections.add(connection); + return connection; } public boolean offer(final EndpointConnection endpointConnection) { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index ff8e0d6e4c..64a174a397 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -23,7 +23,6 @@ import org.apache.nifi.remote.TransactionCompletion; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.AbstractSiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.protocol.DataPacket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,13 +125,14 @@ public class SocketClient extends AbstractSiteToSiteClient { } final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig()); + if (connectionState == null) { + return null; + } final Transaction transaction; try { transaction = connectionState.getSocketClientProtocol().startTransaction( connectionState.getPeer(), connectionState.getCodec(), direction); - } catch (final NoContentException e) { - return null; } catch (final Throwable t) { pool.terminate(connectionState); throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java deleted file mode 100644 index a0dd23d03a..0000000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -import java.io.IOException; - -/** - * A NoContentException occurs when the remote peer has no flowfiles to provide - */ -public class NoContentException extends IOException { - - private static final long serialVersionUID = -689032011082690815L; - - public NoContentException(final String message, final Throwable cause) { - super(message, cause); - } - - public NoContentException(final String message) { - super(message); - } - - public NoContentException(final Throwable cause) { - super(cause); - } -} \ No newline at end of file diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java deleted file mode 100644 index 30a51a0ca6..0000000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -import java.io.IOException; - - -/** - * A NoValidPeerException occurs when all the remote peers are penalized or none exists - */ -public class NoValidPeerException extends IOException { - - private static final long serialVersionUID = 8421102798129193880L; - - public NoValidPeerException(final String message, final Throwable cause) { - super(message, cause); - } - - public NoValidPeerException(final String message) { - super(message); - } - - public NoValidPeerException(final Throwable cause) { - super(cause); - } -} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index 85d6c1a6f3..8b68c9e16a 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -21,7 +21,6 @@ import org.apache.nifi.remote.AbstractTransaction; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.Response; @@ -44,9 +43,6 @@ public class SocketClientTransaction extends AbstractTransaction { this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); initialize(); - if (direction == TransferDirection.RECEIVE && !this.dataAvailable){ - throw new NoContentException("Remote side has no flowfiles to provide"); - } } private void initialize() throws IOException { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 32707087cb..249325d44b 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -64,7 +64,6 @@ import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -148,7 +147,6 @@ public class SiteToSiteRestApiClient implements Closeable { private static final int RESPONSE_CODE_OK = 200; private static final int RESPONSE_CODE_CREATED = 201; private static final int RESPONSE_CODE_ACCEPTED = 202; - private static final int RESPONSE_CODE_NO_CONTENT = 204; private static final int RESPONSE_CODE_BAD_REQUEST = 400; private static final int RESPONSE_CODE_FORBIDDEN = 403; private static final int RESPONSE_CODE_NOT_FOUND = 404; @@ -173,7 +171,7 @@ public class SiteToSiteRestApiClient implements Closeable { private int batchCount = 0; private long batchSize = 0; private long batchDurationMillis = 0; - private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(2,1); + private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); private String trustedPeerDn; private final ScheduledExecutorService ttlExtendTaskExecutor; @@ -500,8 +498,6 @@ public class SiteToSiteRestApiClient implements Closeable { } serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue()); break; - case RESPONSE_CODE_NO_CONTENT: - throw new NoContentException("Server has no flowfiles to provide"); default: try (InputStream content = response.getEntity().getContent()) { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java index 72dd9a6448..d98774e7ef 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java @@ -35,7 +35,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -50,6 +49,8 @@ import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.reducing; import static java.util.stream.Collectors.toMap; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -227,13 +228,10 @@ public class TestPeerSelector { throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running."); }).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class)); - - ArrayList peers; - // 1st attempt. It uses the bootstrap node. peerSelector.refreshPeers(); - peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); - assert(!peers.isEmpty()); + PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNotNull(peerStatus); // Proceed time so that peer selector refresh statuses. peerStatuses.remove(bootstrapNodeStatus); @@ -241,35 +239,34 @@ public class TestPeerSelector { // 2nd attempt. peerSelector.refreshPeers(); - peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); - assert(!peers.isEmpty()); - assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription()); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNotNull(peerStatus); + assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); // Proceed time so that peer selector refresh statuses. systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; // 3rd attempt. peerSelector.refreshPeers(); - peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); - assert(!peers.isEmpty()); - assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription()); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNotNull(peerStatus); + assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); // Remove node2 to simulate that it goes down. There's no available node at this point. peerStatuses.remove(node2Status); systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; peerSelector.refreshPeers(); - peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); - assertTrue("PeerSelector should return an empty list as next peer statuses, since there's no available peer", peers.isEmpty()); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus); // Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node. peerStatuses.add(bootstrapNodeStatus); systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; peerSelector.refreshPeers(); - peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE); - assert(!peers.isEmpty()); - assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription()); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription()); } @Test diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java index 706b4ca299..ded1db194b 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java @@ -57,7 +57,6 @@ import org.apache.nifi.remote.client.KeystoreType; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.DataPacket; @@ -198,21 +197,6 @@ public class TestHttpClient { } - public static class EmptyPortTransactionsServlet extends PortTransactionsServlet { - - @Override - protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - - final int reqProtocolVersion = getReqProtocolVersion(req); - if (reqProtocolVersion == 1) { - super.doPost(req, resp); - } else { - respondWithText(resp, "No flowfiles available", 204); - } - } - - } - public static class PortTransactionsAccessDeniedServlet extends HttpServlet { @Override @@ -513,8 +497,6 @@ public class TestHttpClient { servletHandler.addServletWithMapping(OutputPortTransactionServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id"); servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files"); - servletHandler.addServletWithMapping(EmptyPortTransactionsServlet.class,"/data-transfer/output-ports/empty-output-running-id/transactions"); - server.start(); logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort()); @@ -674,13 +656,6 @@ public class TestHttpClient { runningOutputPort.setState(ScheduledState.RUNNING.name()); outputPorts.add(runningOutputPort); - final PortDTO emptyRunningOutputPort = new PortDTO(); - emptyRunningOutputPort.setName("empty-output-running"); - emptyRunningOutputPort.setId("empty-output-running-id"); - emptyRunningOutputPort.setType("OUTPUT_PORT"); - emptyRunningOutputPort.setState(ScheduledState.RUNNING.name()); - outputPorts.add(emptyRunningOutputPort); - final PortDTO timeoutOutputPort = new PortDTO(); timeoutOutputPort.setName("output-timeout"); timeoutOutputPort.setId("output-timeout-id"); @@ -743,10 +718,9 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); - fail(); - } catch (final NoValidPeerException e) { - assertNotNull(e.getMessage()); + assertNull(transaction); + } } @@ -763,10 +737,9 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); - fail(); - } catch (final NoValidPeerException e) { - assertNotNull(e.getMessage()); + assertNull(transaction); + } } @@ -782,11 +755,11 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); - fail(); - } catch (final NoValidPeerException e) { - assertNotNull(e.getMessage()); + assertNull(transaction); + } + } @Test @@ -881,10 +854,7 @@ public class TestHttpClient { .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); - fail(); - - } catch (final NoValidPeerException e) { - assertNotNull("createTransaction should fail at peer selection and return null.", e.getMessage()); + assertNull("createTransaction should fail at peer selection and return null.", transaction); } } @@ -1254,23 +1224,6 @@ public class TestHttpClient { } } - @Test - public void testReceiveEmptyPort() throws Exception { - - try ( - SiteToSiteClient client = getDefaultBuilder() - .portName("empty-output-running") - .build() - ) { - try { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - assertNull(transaction); - } catch (IOException e) { - fail(); - } - } - } - private void testReceive(SiteToSiteClient client) throws IOException { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java index 048d61249a..edae052d12 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java @@ -23,7 +23,6 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec; -import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.io.socket.SocketCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketInput; import org.apache.nifi.remote.io.socket.SocketOutput; @@ -45,6 +44,7 @@ import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPack import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum; +import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles; @@ -52,7 +52,6 @@ import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithIn import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -93,12 +92,14 @@ public class TestSocketClientTransaction { ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray()); ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE); - fail(); - } catch (final NoContentException e) { - assertEquals("Remote side has no flowfiles to provide", e.getMessage()); - } + SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE); + + execReceiveZeroFlowFile(transaction); + + // Verify what client has sent. + DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); + assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient)); + assertEquals(-1, sentByClient.read()); } @Test diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java index 278e6b68a6..83a7e42ed7 100644 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java +++ b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java @@ -147,13 +147,6 @@ public class NiFiReceiver extends Receiver { try { while (!isStopped()) { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - if (transaction == null) { - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - } - continue; - } DataPacket dataPacket = transaction.receive(); if (dataPacket == null) { transaction.confirm(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 95f5c2ee93..da050e2791 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -36,7 +36,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -223,13 +222,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final Transaction transaction; try { transaction = client.createTransaction(transferDirection); - } catch (final NoValidPeerException e) { - final String message = String.format("%s Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this); - logger.debug(message); - session.rollback(); - context.yield(); - remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - return; } catch (final PortNotRunningException e) { context.yield(); this.targetRunning.set(false); @@ -265,10 +257,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } + if (transaction == null) { + logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this); + session.rollback(); context.yield(); - final String message = String.format("%s successfully connected to %s, but it has no flowfiles to provide, yielding", this, url); - logger.debug(message); return; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java index 054370643c..cd82fe7ebd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java @@ -17,7 +17,6 @@ package org.apache.nifi.stateless.core; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.remote.exception.NoValidPeerException; import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; @@ -115,8 +114,8 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent { try { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); if (transaction == null) { - getLogger().debug("No flowfiles to receive"); - return true; + getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name}); + return false; } final Queue destinationQueue = new LinkedList<>(); @@ -140,9 +139,6 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent { transaction.confirm(); transaction.complete(); - } catch (final NoValidPeerException e) { - getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name}); - return false; } catch (final Exception e) { getLogger().error("Failed to receive FlowFile via site-to-site", e); return false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java index f74d66edc5..c8787d3a07 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java @@ -31,7 +31,6 @@ import org.apache.nifi.authorization.PublicPortAuthorizable; import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.connectable.Connection; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerDescription; @@ -78,7 +77,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; import java.net.InetAddress; import java.net.UnknownHostException; @@ -92,7 +90,6 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERT import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION; /** * RESTful endpoint for managing a SiteToSite connection. @@ -208,21 +205,9 @@ public class DataTransferResource extends ApplicationResource { final int transportProtocolVersion = validationResult.transportProtocolVersion; try { - HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion); - - int protocolVersion = Integer.parseUnsignedInt(req.getHeader(PROTOCOL_VERSION)); - - if ((protocolVersion >= 2) && PORT_TYPE_OUTPUT.equals(portType)) { - List connectionList = serverProtocol.getPort().getIncomingConnections(); - if (connectionList.stream().allMatch(c -> c.getFlowFileQueue().isEmpty())) { - // Transaction could be created, but there is nothing to transfer. Just return 200. - logger.debug("Output port has no flowfiles to transfer, returning 200"); - transactionManager.cancelTransaction(transactionId); - return noCache(Response.status(Response.Status.NO_CONTENT)).type(MediaType.TEXT_PLAIN).entity("No flowfiles available").build(); - } - } - // Execute handshake. + initiateServerProtocol(req, peer, transportProtocolVersion); + TransactionResultEntity entity = new TransactionResultEntity(); entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode()); entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId); diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java index 00bdaf8968..e57dbbd544 100644 --- a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java @@ -30,7 +30,6 @@ import org.apache.commons.cli.ParseException; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.KeystoreType; import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.util.FormatUtils; @@ -242,8 +241,6 @@ public class SiteToSiteCliMain { } else { new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles(); } - } catch (final NoContentException e) { - System.out.println("Remote port has no flowfiles"); } } catch (Exception e) { printUsage(e.getMessage(), options); diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java index 82d19d207a..88ad8f3781 100644 --- a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java @@ -22,7 +22,6 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransactionCompletion; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.exception.NoContentException; import org.apache.nifi.remote.protocol.DataPacket; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -46,9 +45,6 @@ public class SiteToSiteReceiver { public TransactionCompletion receiveFiles() throws IOException { Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE); - if (transaction == null) { - throw new NoContentException("Remote side has no flowfiles to provide"); - } JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output); jsonGenerator.writeStartArray(); DataPacket dataPacket;