NIFI-6530 - HTTP SiteToSite server returns 201 in case no data is available

This closes #3647.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Arpad Boda 2019-08-12 17:30:30 +02:00 committed by Koji Kawamura
parent 92fd3129ba
commit f01668e66a
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
18 changed files with 253 additions and 73 deletions

View File

@ -262,7 +262,7 @@ public class PeerSelector {
* for RECEIVE, a peer with more flow files is preferred * for RECEIVE, a peer with more flow files is preferred
* @return a selected peer, if there is no available peer or all peers are penalized, then return null * @return a selected peer, if there is no available peer or all peers are penalized, then return null
*/ */
public PeerStatus getNextPeerStatus(final TransferDirection direction) { public ArrayList<PeerStatus> getPeerStatuses(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses; List<PeerStatus> peerList = peerStatuses;
if (isPeerRefreshNeeded(peerList)) { if (isPeerRefreshNeeded(peerList)) {
peerRefreshLock.lock(); peerRefreshLock.lock();
@ -289,25 +289,15 @@ public class PeerSelector {
} }
} }
if (peerList == null || peerList.isEmpty()) { if (peerList == null || peerList.isEmpty()) {
return null; return new ArrayList<PeerStatus>();
} }
PeerStatus peerStatus; ArrayList<PeerStatus> retVal = new ArrayList<>(peerList);
for (int i = 0; i < peerList.size(); i++) { retVal.removeIf(p -> isPenalized(p));
final long idx = peerIndex.getAndIncrement();
final int listIndex = (int) (idx % peerList.size());
peerStatus = peerList.get(listIndex);
if (isPenalized(peerStatus)) { return retVal;
logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
} else {
return peerStatus;
}
}
logger.debug("{} All peers appear to be penalized; returning null", this);
return null;
} }
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException { private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException {

View File

@ -27,6 +27,8 @@ import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider; import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -124,9 +127,11 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
@Override @Override
public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException { public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
Integer peersWithNoContent = 0;
PeerStatus peerStatus; ArrayList<PeerStatus> peers = peerSelector.getPeerStatuses(direction);
while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) {
for (PeerStatus peerStatus : peers) {
logger.debug("peerStatus={}", peerStatus); logger.debug("peerStatus={}", peerStatus);
final CommunicationsSession commSession = new HttpCommunicationsSession(); final CommunicationsSession commSession = new HttpCommunicationsSession();
@ -168,6 +173,11 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
try { try {
transactionUrl = apiClient.initiateTransaction(direction, portId); transactionUrl = apiClient.initiateTransaction(direction, portId);
commSession.setUserDn(apiClient.getTrustedPeerDn()); commSession.setUserDn(apiClient.getTrustedPeerDn());
} catch (final NoContentException e) {
apiClient.close();
peersWithNoContent++;
logger.debug("Peer {} has no flowfiles to provide", peer);
continue;
} catch (final Exception e) { } catch (final Exception e) {
apiClient.close(); apiClient.close();
logger.warn("Penalizing a peer {} due to {}", peer, e.toString()); logger.warn("Penalizing a peer {} due to {}", peer, e.toString());
@ -210,8 +220,12 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
return transaction; return transaction;
} }
logger.info("Couldn't find a valid peer to communicate with."); if(peersWithNoContent > 0) {
return null; return null;
}
String error = new String("Couldn't find a valid peer to communicate with.");
logger.info(error);
throw new NoValidPeerException(error);
} }
private String resolveNodeApiUrl(final PeerDescription description) { private String resolveNodeApiUrl(final PeerDescription description) {

View File

@ -32,6 +32,7 @@ public class TransportProtocolVersionNegotiator extends StandardVersionNegotiato
public int getTransactionProtocolVersion() { public int getTransactionProtocolVersion() {
switch (getVersion()) { switch (getVersion()) {
case 1: case 1:
case 2:
return 5; return 5;
default: default:
throw new RuntimeException("Transport protocol version " + getVersion() throw new RuntimeException("Transport protocol version " + getVersion()

View File

@ -29,6 +29,7 @@ import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException; import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
@ -165,14 +166,9 @@ public class EndpointConnectionPool implements PeerStatusProvider {
throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe); throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe);
} }
do { for (PeerStatus peerStatus : peerSelector.getPeerStatuses(direction)) {
final List<EndpointConnection> addBack = new ArrayList<>(); final List<EndpointConnection> addBack = new ArrayList<>();
logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = peerSelector.getNextPeerStatus(direction);
logger.debug("{} next peer status = {}", this, peerStatus); logger.debug("{} next peer status = {}", this, peerStatus);
if (peerStatus == null) {
return null;
}
final PeerDescription peerDescription = peerStatus.getPeerDescription(); final PeerDescription peerDescription = peerStatus.getPeerDescription();
BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription); BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
@ -192,7 +188,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
if (connection == null && !addBack.isEmpty()) { if (connection == null && !addBack.isEmpty()) {
// all available connections have been penalized. // all available connections have been penalized.
logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId); logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId);
return null; throw new NoValidPeerException("All peers are penalized");
} }
if (connection != null && connection.getPeer().isPenalized(portId)) { if (connection != null && connection.getPeer().isPenalized(portId)) {
@ -318,10 +314,13 @@ public class EndpointConnectionPool implements PeerStatusProvider {
} }
} }
} while (connection == null || codec == null || commsSession == null || protocol == null); if( connection != null && codec != null && commsSession != null && protocol != null) {
activeConnections.add(connection);
return connection;
}
}
throw new NoValidPeerException("Didn't find any valid peer to connect to");
activeConnections.add(connection);
return connection;
} }
public boolean offer(final EndpointConnection endpointConnection) { public boolean offer(final EndpointConnection endpointConnection) {

View File

@ -23,6 +23,7 @@ import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.AbstractSiteToSiteClient; import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -125,14 +126,13 @@ public class SocketClient extends AbstractSiteToSiteClient {
} }
final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig()); final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
if (connectionState == null) {
return null;
}
final Transaction transaction; final Transaction transaction;
try { try {
transaction = connectionState.getSocketClientProtocol().startTransaction( transaction = connectionState.getSocketClientProtocol().startTransaction(
connectionState.getPeer(), connectionState.getCodec(), direction); connectionState.getPeer(), connectionState.getCodec(), direction);
} catch (final NoContentException e) {
return null;
} catch (final Throwable t) { } catch (final Throwable t) {
pool.terminate(connectionState); pool.terminate(connectionState);
throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t); throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.exception;
import java.io.IOException;
/**
* A NoContentException occurs when the remote peer has no flowfiles to provide
*/
public class NoContentException extends IOException {
private static final long serialVersionUID = -689032011082690815L;
public NoContentException(final String message, final Throwable cause) {
super(message, cause);
}
public NoContentException(final String message) {
super(message);
}
public NoContentException(final Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.exception;
import java.io.IOException;
/**
* A NoValidPeerException occurs when all the remote peers are penalized or none exists
*/
public class NoValidPeerException extends IOException {
private static final long serialVersionUID = 8421102798129193880L;
public NoValidPeerException(final String message, final Throwable cause) {
super(message, cause);
}
public NoValidPeerException(final String message) {
super(message);
}
public NoValidPeerException(final Throwable cause) {
super(cause);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.remote.AbstractTransaction;
import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.Response; import org.apache.nifi.remote.protocol.Response;
@ -43,6 +44,9 @@ public class SocketClientTransaction extends AbstractTransaction {
this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
initialize(); initialize();
if (direction == TransferDirection.RECEIVE && !this.dataAvailable){
throw new NoContentException("Remote side has no flowfiles to provide");
}
} }
private void initialize() throws IOException { private void initialize() throws IOException {

View File

@ -64,6 +64,7 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
@ -147,6 +148,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private static final int RESPONSE_CODE_OK = 200; private static final int RESPONSE_CODE_OK = 200;
private static final int RESPONSE_CODE_CREATED = 201; private static final int RESPONSE_CODE_CREATED = 201;
private static final int RESPONSE_CODE_ACCEPTED = 202; private static final int RESPONSE_CODE_ACCEPTED = 202;
private static final int RESPONSE_CODE_NO_CONTENT = 204;
private static final int RESPONSE_CODE_BAD_REQUEST = 400; private static final int RESPONSE_CODE_BAD_REQUEST = 400;
private static final int RESPONSE_CODE_FORBIDDEN = 403; private static final int RESPONSE_CODE_FORBIDDEN = 403;
private static final int RESPONSE_CODE_NOT_FOUND = 404; private static final int RESPONSE_CODE_NOT_FOUND = 404;
@ -171,7 +173,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private int batchCount = 0; private int batchCount = 0;
private long batchSize = 0; private long batchSize = 0;
private long batchDurationMillis = 0; private long batchDurationMillis = 0;
private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(2,1);
private String trustedPeerDn; private String trustedPeerDn;
private final ScheduledExecutorService ttlExtendTaskExecutor; private final ScheduledExecutorService ttlExtendTaskExecutor;
@ -498,6 +500,8 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue()); serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue());
break; break;
case RESPONSE_CODE_NO_CONTENT:
throw new NoContentException("Server has no flowfiles to provide");
default: default:
try (InputStream content = response.getEntity().getContent()) { try (InputStream content = response.getEntity().getContent()) {

View File

@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -36,8 +37,6 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing; import static java.util.stream.Collectors.reducing;
import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toMap;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -212,10 +211,13 @@ public class TestPeerSelector {
throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running."); throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running.");
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class)); }).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
ArrayList<PeerStatus> peers;
// 1st attempt. It uses the bootstrap node. // 1st attempt. It uses the bootstrap node.
peerSelector.refreshPeers(); peerSelector.refreshPeers();
PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
assertNotNull(peerStatus); assert(!peers.isEmpty());
// Proceed time so that peer selector refresh statuses. // Proceed time so that peer selector refresh statuses.
peerStatuses.remove(bootstrapNodeStatus); peerStatuses.remove(bootstrapNodeStatus);
@ -223,33 +225,34 @@ public class TestPeerSelector {
// 2nd attempt. // 2nd attempt.
peerSelector.refreshPeers(); peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
assertNotNull(peerStatus); assert(!peers.isEmpty());
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription());
// Proceed time so that peer selector refresh statuses. // Proceed time so that peer selector refresh statuses.
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
// 3rd attempt. // 3rd attempt.
peerSelector.refreshPeers(); peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
assertNotNull(peerStatus); assert(!peers.isEmpty());
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peers.get(0).getPeerDescription());
// Remove node2 to simulate that it goes down. There's no available node at this point. // Remove node2 to simulate that it goes down. There's no available node at this point.
peerStatuses.remove(node2Status); peerStatuses.remove(node2Status);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers(); peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus); assertTrue("PeerSelector should return an empty list as next peer statuses, since there's no available peer", peers.isEmpty());
// Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node. // Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node.
peerStatuses.add(bootstrapNodeStatus); peerStatuses.add(bootstrapNodeStatus);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers(); peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription()); assert(!peers.isEmpty());
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription());
} }
} }

View File

@ -57,6 +57,7 @@ import org.apache.nifi.remote.client.KeystoreType;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
@ -197,6 +198,21 @@ public class TestHttpClient {
} }
public static class EmptyPortTransactionsServlet extends PortTransactionsServlet {
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
final int reqProtocolVersion = getReqProtocolVersion(req);
if (reqProtocolVersion == 1) {
super.doPost(req, resp);
} else {
respondWithText(resp, "No flowfiles available", 204);
}
}
}
public static class PortTransactionsAccessDeniedServlet extends HttpServlet { public static class PortTransactionsAccessDeniedServlet extends HttpServlet {
@Override @Override
@ -497,6 +513,8 @@ public class TestHttpClient {
servletHandler.addServletWithMapping(OutputPortTransactionServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id"); servletHandler.addServletWithMapping(OutputPortTransactionServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id");
servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files"); servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class, "/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files");
servletHandler.addServletWithMapping(EmptyPortTransactionsServlet.class,"/data-transfer/output-ports/empty-output-running-id/transactions");
server.start(); server.start();
logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort()); logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort());
@ -656,6 +674,13 @@ public class TestHttpClient {
runningOutputPort.setState(ScheduledState.RUNNING.name()); runningOutputPort.setState(ScheduledState.RUNNING.name());
outputPorts.add(runningOutputPort); outputPorts.add(runningOutputPort);
final PortDTO emptyRunningOutputPort = new PortDTO();
emptyRunningOutputPort.setName("empty-output-running");
emptyRunningOutputPort.setId("empty-output-running-id");
emptyRunningOutputPort.setType("OUTPUT_PORT");
emptyRunningOutputPort.setState(ScheduledState.RUNNING.name());
outputPorts.add(emptyRunningOutputPort);
final PortDTO timeoutOutputPort = new PortDTO(); final PortDTO timeoutOutputPort = new PortDTO();
timeoutOutputPort.setName("output-timeout"); timeoutOutputPort.setName("output-timeout");
timeoutOutputPort.setId("output-timeout-id"); timeoutOutputPort.setId("output-timeout-id");
@ -718,9 +743,10 @@ public class TestHttpClient {
.build() .build()
) { ) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND); final Transaction transaction = client.createTransaction(TransferDirection.SEND);
fail();
assertNull(transaction); } catch (final NoValidPeerException e) {
assertNotNull(e.getMessage());
} }
} }
@ -737,9 +763,10 @@ public class TestHttpClient {
.build() .build()
) { ) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND); final Transaction transaction = client.createTransaction(TransferDirection.SEND);
fail();
assertNull(transaction); } catch (final NoValidPeerException e) {
assertNotNull(e.getMessage());
} }
} }
@ -755,11 +782,11 @@ public class TestHttpClient {
.build() .build()
) { ) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND); final Transaction transaction = client.createTransaction(TransferDirection.SEND);
fail();
assertNull(transaction); } catch (final NoValidPeerException e) {
assertNotNull(e.getMessage());
} }
} }
@Test @Test
@ -854,7 +881,10 @@ public class TestHttpClient {
.build() .build()
) { ) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND); final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNull("createTransaction should fail at peer selection and return null.", transaction); fail();
} catch (final NoValidPeerException e) {
assertNotNull("createTransaction should fail at peer selection and return null.", e.getMessage());
} }
} }
@ -1224,6 +1254,23 @@ public class TestHttpClient {
} }
} }
@Test
public void testReceiveEmptyPort() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("empty-output-running")
.build()
) {
try {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
assertNull(transaction);
} catch (IOException e) {
fail();
}
}
}
private void testReceive(SiteToSiteClient client) throws IOException { private void testReceive(SiteToSiteClient client) throws IOException {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);

View File

@ -20,7 +20,6 @@ import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPack
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile;
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles; import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
@ -43,6 +42,7 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.SocketChannelInput; import org.apache.nifi.remote.io.socket.SocketChannelInput;
import org.apache.nifi.remote.io.socket.SocketChannelOutput; import org.apache.nifi.remote.io.socket.SocketChannelOutput;
@ -51,6 +51,7 @@ import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.Response; import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.ResponseCode;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.fail;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -91,14 +92,12 @@ public class TestSocketClientTransaction {
ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray()); ByteArrayInputStream bis = new ByteArrayInputStream(serverResponseBos.toByteArray());
ByteArrayOutputStream bos = new ByteArrayOutputStream(); ByteArrayOutputStream bos = new ByteArrayOutputStream();
SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE); try {
SocketClientTransaction transaction = getClientTransaction(bis, bos, TransferDirection.RECEIVE);
execReceiveZeroFlowFile(transaction); fail();
} catch (final NoContentException e) {
// Verify what client has sent. assertEquals("Remote side has no flowfiles to provide", e.getMessage());
DataInputStream sentByClient = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); }
assertEquals(RequestType.RECEIVE_FLOWFILES, RequestType.readRequestType(sentByClient));
assertEquals(-1, sentByClient.read());
} }
@Test @Test

View File

@ -147,6 +147,13 @@ public class NiFiReceiver extends Receiver<NiFiDataPacket> {
try { try {
while (!isStopped()) { while (!isStopped()) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
if (transaction == null) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
continue;
}
DataPacket dataPacket = transaction.receive(); DataPacket dataPacket = transaction.receive();
if (dataPacket == null) { if (dataPacket == null) {
transaction.confirm(); transaction.confirm();

View File

@ -36,6 +36,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
@ -232,9 +233,16 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} }
final SiteToSiteClient client = getSiteToSiteClient(); final SiteToSiteClient client = getSiteToSiteClient();
final Transaction transaction; Transaction transaction = null;
try { try {
transaction = client.createTransaction(transferDirection); transaction = client.createTransaction(transferDirection);
} catch (final NoValidPeerException e) {
final String message = String.format("%s Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
logger.debug(message);
session.rollback();
context.yield();
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
} catch (final PortNotRunningException e) { } catch (final PortNotRunningException e) {
context.yield(); context.yield();
this.targetRunning.set(false); this.targetRunning.set(false);
@ -270,11 +278,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return; return;
} }
if (transaction == null) { if (transaction == null) {
logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
session.rollback();
context.yield(); context.yield();
final String message = String.format("%s successfully connected to %s, but it has no flowfiles to provide, yielding", this, url);
logger.debug(message);
return; return;
} }

View File

@ -31,6 +31,7 @@ import org.apache.nifi.authorization.PublicPortAuthorizable;
import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerDescription;
@ -77,6 +78,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -90,6 +92,7 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERT
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION;
/** /**
* RESTful endpoint for managing a SiteToSite connection. * RESTful endpoint for managing a SiteToSite connection.
@ -205,9 +208,21 @@ public class DataTransferResource extends ApplicationResource {
final int transportProtocolVersion = validationResult.transportProtocolVersion; final int transportProtocolVersion = validationResult.transportProtocolVersion;
try { try {
// Execute handshake. HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion);
initiateServerProtocol(req, peer, transportProtocolVersion);
int protocolVersion = Integer.parseUnsignedInt(req.getHeader(PROTOCOL_VERSION));
if ((protocolVersion >= 2) && PORT_TYPE_OUTPUT.equals(portType)) {
List<Connection> connectionList = serverProtocol.getPort().getIncomingConnections();
if (connectionList.stream().allMatch(c -> c.getFlowFileQueue().isEmpty())) {
// Transaction could be created, but there is nothing to transfer. Just return 200.
logger.debug("Output port has no flowfiles to transfer, returning 200");
transactionManager.cancelTransaction(transactionId);
return noCache(Response.status(Response.Status.NO_CONTENT)).type(MediaType.TEXT_PLAIN).entity("No flowfiles available").build();
}
}
// Execute handshake.
TransactionResultEntity entity = new TransactionResultEntity(); TransactionResultEntity entity = new TransactionResultEntity();
entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode()); entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId); entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.stateless.core; package org.apache.nifi.stateless.core;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
@ -114,8 +115,8 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent {
try { try {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
if (transaction == null) { if (transaction == null) {
getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name}); getLogger().debug("No flowfiles to receive");
return false; return true;
} }
final Queue<StatelessFlowFile> destinationQueue = new LinkedList<>(); final Queue<StatelessFlowFile> destinationQueue = new LinkedList<>();
@ -139,6 +140,9 @@ public class StatelessRemoteOutputPort extends AbstractStatelessComponent {
transaction.confirm(); transaction.confirm();
transaction.complete(); transaction.complete();
} catch (final NoValidPeerException e) {
getLogger().error("Unable to create a transaction for Remote Process Group {} to pull from port {}", new Object[]{url, name});
return false;
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to receive FlowFile via site-to-site", e); getLogger().error("Failed to receive FlowFile via site-to-site", e);
return false; return false;

View File

@ -30,6 +30,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.KeystoreType; import org.apache.nifi.remote.client.KeystoreType;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
@ -241,6 +242,8 @@ public class SiteToSiteCliMain {
} else { } else {
new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles(); new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles();
} }
} catch (final NoContentException e) {
System.out.println("Remote port has no flowfiles");
} }
} catch (Exception e) { } catch (Exception e) {
printUsage(e.getMessage(), options); printUsage(e.getMessage(), options);

View File

@ -22,6 +22,7 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransactionCompletion; import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator;
@ -45,6 +46,9 @@ public class SiteToSiteReceiver {
public TransactionCompletion receiveFiles() throws IOException { public TransactionCompletion receiveFiles() throws IOException {
Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE); Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
if (transaction == null) {
throw new NoContentException("Remote side has no flowfiles to provide");
}
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output); JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output);
jsonGenerator.writeStartArray(); jsonGenerator.writeStartArray();
DataPacket dataPacket; DataPacket dataPacket;