NIFI-282: Fixed bug that caused load distribution across nodes in cluster not to work

This commit is contained in:
Mark Payne 2015-02-16 16:49:15 -05:00
parent 2f60ddc03a
commit a7405b915d
8 changed files with 198 additions and 105 deletions

View File

@ -25,6 +25,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
public class Peer implements Communicant {
private final PeerDescription description;
private final CommunicationsSession commsSession;
private final String url;
private final String clusterUrl;
@ -34,7 +35,8 @@ public class Peer implements Communicant {
private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false;
public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
public Peer(final PeerDescription description, final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
this.description = description;
this.commsSession = commsSession;
this.url = peerUrl;
this.clusterUrl = clusterUrl;
@ -48,6 +50,10 @@ public class Peer implements Communicant {
}
}
public PeerDescription getDescription() {
return description;
}
@Override
public String getUrl() {
return url;

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
public class PeerDescription {
private final String hostname;
private final int port;
private final boolean secure;
public PeerDescription(final String hostname, final int port, final boolean secure) {
this.hostname = hostname;
this.port = port;
this.secure = secure;
}
public String getHostname() {
return hostname;
}
public int getPort() {
return port;
}
public boolean isSecure() {
return secure;
}
@Override
public String toString() {
return "PeerDescription[hostname=" + hostname + ", port=" + port + ", secure=" + secure + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((hostname == null) ? 0 : hostname.hashCode());
result = prime * result + port;
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final PeerDescription other = (PeerDescription) obj;
if (hostname == null) {
if (other.hostname != null) {
return false;
}
} else if (!hostname.equals(other.hostname)) {
return false;
}
return port == other.port;
}
}

View File

@ -17,43 +17,31 @@
package org.apache.nifi.remote;
public class PeerStatus {
private final String hostname;
private final int port;
private final boolean secure;
private final PeerDescription description;
private final int numFlowFiles;
public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
this.hostname = hostname;
this.port = port;
this.secure = secure;
public PeerStatus(final PeerDescription description, final int numFlowFiles) {
this.description = description;
this.numFlowFiles = numFlowFiles;
}
public String getHostname() {
return hostname;
public PeerDescription getPeerDescription() {
return description;
}
public int getPort() {
return port;
}
public boolean isSecure() {
return secure;
}
public int getFlowFileCount() {
return numFlowFiles;
}
@Override
public String toString() {
return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() +
",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]";
}
@Override
public int hashCode() {
return 9824372 + hostname.hashCode() + port;
return 9824372 + description.getHostname().hashCode() + description.getPort() * 41;
}
@Override
@ -67,6 +55,6 @@ public class PeerStatus {
}
final PeerStatus other = (PeerStatus) obj;
return port == other.port && hostname.equals(other.hostname);
return description.equals(other.getPeerDescription());
}
}

View File

@ -61,6 +61,7 @@ import javax.security.cert.CertificateNotYetValidException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
@ -97,8 +98,8 @@ public class EndpointConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
private final BlockingQueue<EndpointConnection> connectionQueue = new LinkedBlockingQueue<>();
private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>();
private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
private final URI clusterUrl;
private final String apiUri;
@ -227,6 +228,23 @@ public class EndpointConnectionPool {
SocketClientProtocol protocol = null;
EndpointConnection connection;
Peer peer = null;
logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = getNextPeerStatus(direction);
logger.debug("{} next peer status = {}", this, peerStatus);
if ( peerStatus == null ) {
return null;
}
final PeerDescription peerDescription = peerStatus.getPeerDescription();
BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerStatus);
if ( connectionQueue == null ) {
connectionQueue = new LinkedBlockingQueue<>();
BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
if ( existing != null ) {
connectionQueue = existing;
}
}
final List<EndpointConnection> addBack = new ArrayList<>();
try {
@ -254,19 +272,12 @@ public class EndpointConnectionPool {
protocol = new SocketClientProtocol();
protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = getNextPeerStatus(direction);
logger.debug("{} next peer status = {}", this, peerStatus);
if ( peerStatus == null ) {
return null;
}
final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
try {
logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
commsSession = establishSiteToSiteConnection(peerStatus);
} catch (final IOException ioe) {
penalize(peerStatus, penalizationMillis);
penalize(peerStatus.getPeerDescription(), penalizationMillis);
throw ioe;
}
@ -283,8 +294,8 @@ public class EndpointConnectionPool {
}
}
final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
final String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort();
peer = new Peer(peerDescription, commsSession, peerUrl, clusterUrl.toString());
// set properties based on config
if ( config != null ) {
@ -371,6 +382,11 @@ public class EndpointConnectionPool {
return false;
}
final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
if ( connectionQueue == null ) {
return false;
}
activeConnections.remove(endpointConnection);
if ( shutdown ) {
terminate(endpointConnection);
@ -381,14 +397,14 @@ public class EndpointConnectionPool {
}
}
private void penalize(final PeerStatus status, final long penalizationMillis) {
Long expiration = peerTimeoutExpirations.get(status);
private void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
Long expiration = peerTimeoutExpirations.get(peerDescription);
if ( expiration == null ) {
expiration = Long.valueOf(0L);
}
final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
}
/**
@ -396,19 +412,7 @@ public class EndpointConnectionPool {
* @param peer
*/
public void penalize(final Peer peer, final long penalizationMillis) {
String host;
int port;
try {
final URI uri = new URI(peer.getUrl());
host = uri.getHost();
port = uri.getPort();
} catch (final URISyntaxException e) {
host = peer.getHost();
port = -1;
}
final PeerStatus status = new PeerStatus(host, port, true, 1);
penalize(status, penalizationMillis);
penalize(peer.getDescription(), penalizationMillis);
}
private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
@ -509,7 +513,8 @@ public class EndpointConnectionPool {
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
final List<NodeInformation> nodeInfos = new ArrayList<>();
for ( final PeerStatus peerStatus : statuses ) {
final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
final PeerDescription description = peerStatus.getPeerDescription();
final NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount());
nodeInfos.add(nodeInfo);
}
clusterNodeInfo.setNodeInformation(nodeInfos);
@ -526,7 +531,7 @@ public class EndpointConnectionPool {
if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
for (final PeerStatus status : cache.getStatuses()) {
final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1);
equalizedSet.add(equalizedStatus);
}
@ -543,8 +548,9 @@ public class EndpointConnectionPool {
throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
}
final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
final SocketClientProtocol clientProtocol = new SocketClientProtocol();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
@ -602,7 +608,8 @@ public class EndpointConnectionPool {
final OutputStream out = new BufferedOutputStream(fos)) {
for (final PeerStatus status : statuses) {
final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n";
final PeerDescription description = status.getPeerDescription();
final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n";
out.write(line.getBytes(StandardCharsets.UTF_8));
}
@ -631,7 +638,7 @@ public class EndpointConnectionPool {
final int port = Integer.parseInt(splits[1]);
final boolean secure = Boolean.parseBoolean(splits[2]);
statuses.add(new PeerStatus(hostname, port, secure, 1));
statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1));
}
}
@ -640,7 +647,8 @@ public class EndpointConnectionPool {
private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort());
final PeerDescription description = peerStatus.getPeerDescription();
return establishSiteToSiteConnection(description.getHostname(), description.getPort());
}
private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
@ -720,7 +728,8 @@ public class EndpointConnectionPool {
final int index = n % destinations.size();
PeerStatus status = destinations.get(index);
if ( status == null ) {
status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
final PeerDescription description = new PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure());
status = new PeerStatus(description, nodeInfo.getTotalFlowFiles());
destinations.set(index, status);
break;
} else {
@ -744,27 +753,29 @@ public class EndpointConnectionPool {
private void cleanupExpiredSockets() {
final List<EndpointConnection> connections = new ArrayList<>();
EndpointConnection connection;
while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = connection.getLastTimeUsed();
if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
try {
connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}",
new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
final List<EndpointConnection> connections = new ArrayList<>();
EndpointConnection connection;
while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = connection.getLastTimeUsed();
if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
try {
connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}",
new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
}
terminate(connection);
} else {
connections.add(connection);
}
terminate(connection);
} else {
connections.add(connection);
}
connectionQueue.addAll(connections);
}
connectionQueue.addAll(connections);
}
public void shutdown() {
@ -775,10 +786,12 @@ public class EndpointConnectionPool {
for ( final EndpointConnection conn : activeConnections ) {
conn.getPeer().getCommunicationsSession().interrupt();
}
EndpointConnection state;
while ( (state = connectionQueue.poll()) != null) {
cleanup(state.getSocketClientProtocol(), state.getPeer());
for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values() ) {
EndpointConnection state;
while ( (state = connectionQueue.poll()) != null) {
cleanup(state.getSocketClientProtocol(), state.getPeer());
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
@ -117,7 +118,7 @@ public class SocketClientProtocol implements ClientProtocol {
properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
if ( destinationId != null ) {
properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier());
properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId);
}
properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
@ -229,7 +230,7 @@ public class SocketClientProtocol implements ClientProtocol {
final int port = dis.readInt();
final boolean secure = dis.readBoolean();
final int flowFileCount = dis.readInt();
peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount));
}
logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);

View File

@ -41,7 +41,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
System.out.println(peerStatus.getPeerDescription());
}
}
@ -55,7 +55,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
System.out.println(peerStatus.getPeerDescription());
}
}
@ -75,7 +75,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
System.out.println(peerStatus.getPeerDescription());
}
}
@ -89,7 +89,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
System.out.println(peerStatus.getPeerDescription());
}
}
}

View File

@ -43,26 +43,31 @@ public class TestSiteToSiteClient {
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("cba")
.requestBatchCount(1)
.requestBatchCount(10)
.build();
try {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
Assert.assertNotNull(transaction);
final DataPacket packet = transaction.receive();
Assert.assertNotNull(packet);
final InputStream in = packet.getData();
final long size = packet.getSize();
final byte[] buff = new byte[(int) size];
StreamUtils.fillBuffer(in, buff);
Assert.assertNull(transaction.receive());
transaction.confirm();
transaction.complete();
for (int i=0; i < 1000; i++) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
Assert.assertNotNull(transaction);
DataPacket packet;
while (true) {
packet = transaction.receive();
if ( packet == null ) {
break;
}
final InputStream in = packet.getData();
final long size = packet.getSize();
final byte[] buff = new byte[(int) size];
StreamUtils.fillBuffer(in, buff);
}
transaction.confirm();
transaction.complete();
}
} finally {
client.close();
}
@ -70,7 +75,7 @@ public class TestSiteToSiteClient {
@Test
//@Ignore("For local testing only; not really a unit test but a manual test")
@Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");

View File

@ -206,7 +206,8 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
protocol.setRootProcessGroup(rootGroup.get());
protocol.setNodeInformant(nodeInformant);
peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort());
final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
LOG.debug("Handshaking....");
protocol.handshake(peer);