NIFI-282: Fixed bug that caused client not to be able to communicate with remote NiFi instance

This commit is contained in:
Mark Payne 2015-02-12 09:15:07 -05:00
parent d1e058cde7
commit 5c8a9e22d1
3 changed files with 47 additions and 33 deletions

View File

@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@ -114,6 +115,7 @@ public class EndpointConnectionPool {
private final SSLContext sslContext;
private final ScheduledExecutorService taskExecutor;
private final int idleExpirationMillis;
private final RemoteDestination remoteDestination;
private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
@ -128,15 +130,17 @@ public class EndpointConnectionPool {
private volatile boolean shutdown = false;
public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
final EventReporter eventReporter, final File persistenceFile)
public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis,
final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile)
{
this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
}
public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile)
{
Objects.requireNonNull(clusterUrl, "URL cannot be null");
Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
try {
this.clusterUrl = new URI(clusterUrl);
} catch (final URISyntaxException e) {
@ -150,6 +154,7 @@ public class EndpointConnectionPool {
}
apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
this.remoteDestination = remoteDestination;
this.sslContext = sslContext;
this.peersFile = persistenceFile;
this.eventReporter = eventReporter;
@ -197,12 +202,12 @@ public class EndpointConnectionPool {
}
public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
return getEndpointConnection(remoteDestination, direction, null);
public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
return getEndpointConnection(direction, null);
}
public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
//
// Attempt to get a connection state that already exists for this URL.
//
@ -419,6 +424,7 @@ public class EndpointConnectionPool {
return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
}
private PeerStatus getNextPeerStatus(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses;
if ( isPeerRefreshNeeded(peerList) ) {
@ -532,7 +538,12 @@ public class EndpointConnectionPool {
RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
clientProtocol.setTimeout(commsTimeout);
clientProtocol.handshake(peer, null);
if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
clientProtocol.handshake(peer, remoteDestination.getIdentifier());
} else {
clientProtocol.handshake(peer, null);
}
final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
persistPeerStatuses(peerStatuses);

View File

@ -40,9 +40,11 @@ public class SocketClient implements SiteToSiteClient {
private final String portName;
private final long penalizationNanos;
private volatile String portIdentifier;
private volatile boolean closed = false;
public SocketClient(final SiteToSiteClientConfig config) {
pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
pool = new EndpointConnectionPool(config.getUrl(), createRemoteDestination(config.getPortIdentifier()),
(int) config.getTimeout(TimeUnit.MILLISECONDS),
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
@ -107,15 +109,16 @@ public class SocketClient implements SiteToSiteClient {
@Override
public Transaction createTransaction(final TransferDirection direction) throws IOException {
if ( closed ) {
throw new IllegalStateException("Client is closed");
}
final String portId = getPortIdentifier(direction);
if ( portId == null ) {
throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
}
final RemoteDestination remoteDestination = createRemoteDestination(portId);
final EndpointConnection connectionState = pool.getEndpointConnection(remoteDestination, direction, getConfig());
final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
if ( connectionState == null ) {
return null;
}
@ -196,6 +199,7 @@ public class SocketClient implements SiteToSiteClient {
@Override
public void close() throws IOException {
closed = true;
pool.shutdown();
}

View File

@ -75,28 +75,27 @@ public class TestSiteToSiteClient {
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://10.0.64.63:8080/nifi")
.portName("input")
.nodePenalizationPeriod(10, TimeUnit.MILLISECONDS)
.build();
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("input")
.build();
try {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
Assert.assertNotNull(transaction);
final Map<String, String> attrs = new HashMap<>();
attrs.put("site-to-site", "yes, please!");
final byte[] bytes = "Hello".getBytes();
final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
transaction.send(packet);
transaction.confirm();
transaction.complete();
} finally {
client.close();
}
try {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
Assert.assertNotNull(transaction);
final Map<String, String> attrs = new HashMap<>();
attrs.put("site-to-site", "yes, please!");
final byte[] bytes = "Hello".getBytes();
final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
transaction.send(packet);
transaction.confirm();
transaction.complete();
} finally {
client.close();
}
}
}