NIFI-3377: NiFi RPG errors when switching between transport protocols. This closes #2340.

This commit is contained in:
Koji Kawamura 2017-12-14 17:23:25 +09:00 committed by Mark Payne
parent 2608351113
commit b8375a681a
1 changed files with 4 additions and 3 deletions

View File

@ -42,6 +42,7 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException; import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient; import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.remote.util.StandardDataPacket;
@ -124,9 +125,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.targetId = targetId; this.targetId = targetId;
} }
private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties) { private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties, final SiteToSiteTransportProtocol transportProtocol) {
final File stateDir = nifiProperties.getPersistentStateDirectory(); final File stateDir = nifiProperties.getPersistentStateDirectory();
return new File(stateDir, portId + ".peers"); return new File(stateDir, String.format("%s_%s.peers", portId, transportProtocol.name()));
} }
@Override @Override
@ -180,7 +181,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
.sslContext(sslContext) .sslContext(sslContext)
.useCompression(isUseCompression()) .useCompression(isUseCompression())
.eventReporter(remoteGroup.getEventReporter()) .eventReporter(remoteGroup.getEventReporter())
.peerPersistenceFile(getPeerPersistenceFile(getIdentifier(), nifiProperties)) .peerPersistenceFile(getPeerPersistenceFile(getIdentifier(), nifiProperties, remoteGroup.getTransportProtocol()))
.nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS) .nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS)
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(remoteGroup.getTransportProtocol()) .transportProtocol(remoteGroup.getTransportProtocol())