From 4249fc943a5c6487cd2c19c646497bb95893a96e Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Tue, 15 Dec 2015 16:50:17 -0500 Subject: [PATCH] NIFI-1284 Creating inner class for SiteToSiteClientConfig to fix serialization issue --- nifi-commons/nifi-site-to-site-client/pom.xml | 6 + .../nifi/remote/client/SiteToSiteClient.java | 261 +++++++++++------- .../client/socket/TestSiteToSiteClient.java | 54 +++- 3 files changed, 212 insertions(+), 109 deletions(-) diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml index 457f47dba4..52179cb01f 100644 --- a/nifi-commons/nifi-site-to-site-client/pom.xml +++ b/nifi-commons/nifi-site-to-site-client/pom.xml @@ -50,5 +50,11 @@ junit test + + com.esotericsoftware.kryo + kryo + 2.24.0 + test + diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 78237b997c..1581c42800 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -513,104 +513,7 @@ public interface SiteToSiteClient extends Closeable { * but does not create a SiteToSiteClient */ public SiteToSiteClientConfig buildConfig() { - return new SiteToSiteClientConfig() { - private static final long serialVersionUID = 1L; - - @Override - public boolean isUseCompression() { - return Builder.this.isUseCompression(); - } - - @Override - public String getUrl() { - return Builder.this.getUrl(); - } - - @Override - public long getTimeout(final TimeUnit timeUnit) { - return Builder.this.getTimeout(timeUnit); - } - - @Override - public long getIdleConnectionExpiration(final TimeUnit timeUnit) { - return Builder.this.getIdleConnectionExpiration(timeUnit); - } - - @Override - public SSLContext getSslContext() { - return Builder.this.getSslContext(); - } - - @Override - public String getPortName() { - return Builder.this.getPortName(); - } - - @Override - public String getPortIdentifier() { - return Builder.this.getPortIdentifier(); - } - - @Override - public long getPenalizationPeriod(final TimeUnit timeUnit) { - return Builder.this.getPenalizationPeriod(timeUnit); - } - - @Override - public File getPeerPersistenceFile() { - return Builder.this.getPeerPersistenceFile(); - } - - @Override - public EventReporter getEventReporter() { - return Builder.this.getEventReporter(); - } - - @Override - public long getPreferredBatchDuration(final TimeUnit timeUnit) { - return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS); - } - - @Override - public long getPreferredBatchSize() { - return batchSize; - } - - @Override - public int getPreferredBatchCount() { - return batchCount; - } - - @Override - public String getKeystoreFilename() { - return keystoreFilename; - } - - @Override - public String getKeystorePassword() { - return keystorePass; - } - - @Override - public KeystoreType getKeystoreType() { - return keystoreType; - } - - @Override - public String getTruststoreFilename() { - return truststoreFilename; - } - - @Override - public String getTruststorePassword() { - return truststorePass; - } - - @Override - public KeystoreType getTruststoreType() { - return truststoreType; - } - }; + return new StandardSiteToSiteClientConfig(this); } /** @@ -764,8 +667,168 @@ public interface SiteToSiteClient extends Closeable { } - public abstract class SerializableSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable { + class StandardSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable { + private static final long serialVersionUID = 1L; + private final String url; + private final long timeoutNanos; + private final long penalizationNanos; + private final long idleExpirationNanos; + private final SSLContext sslContext; + private final String keystoreFilename; + private final String keystorePass; + private final KeystoreType keystoreType; + private final String truststoreFilename; + private final String truststorePass; + private final KeystoreType truststoreType; + private final EventReporter eventReporter; + private final File peerPersistenceFile; + private final boolean useCompression; + private final String portName; + private final String portIdentifier; + private final int batchCount; + private final long batchSize; + private final long batchNanos; + + // some serialization frameworks require a default constructor + private StandardSiteToSiteClientConfig() { + this.url = null; + this.timeoutNanos = 0; + this.penalizationNanos = 0; + this.idleExpirationNanos = 0; + this.sslContext = null; + this.keystoreFilename = null; + this.keystorePass = null; + this.keystoreType = null; + this.truststoreFilename = null; + this.truststorePass = null; + this.truststoreType = null; + this.eventReporter = null; + this.peerPersistenceFile = null; + this.useCompression = false; + this.portName = null; + this.portIdentifier = null; + this.batchCount = 0; + this.batchSize = 0; + this.batchNanos = 0; + } + + private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { + this.url = builder.url; + this.timeoutNanos = builder.timeoutNanos; + this.penalizationNanos = builder.penalizationNanos; + this.idleExpirationNanos = builder.idleExpirationNanos; + this.sslContext = builder.sslContext; + this.keystoreFilename = builder.keystoreFilename; + this.keystorePass = builder.keystorePass; + this.keystoreType = builder.keystoreType; + this.truststoreFilename = builder.truststoreFilename; + this.truststorePass = builder.truststorePass; + this.truststoreType = builder.truststoreType; + this.eventReporter = builder.eventReporter; + this.peerPersistenceFile = builder.peerPersistenceFile; + this.useCompression = builder.useCompression; + this.portName = builder.portName; + this.portIdentifier = builder.portIdentifier; + this.batchCount = builder.batchCount; + this.batchSize = builder.batchSize; + this.batchNanos = builder.batchNanos; + } + + @Override + public boolean isUseCompression() { + return useCompression; + } + + @Override + public String getUrl() { + return url; + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); + } + + @Override + public SSLContext getSslContext() { + return sslContext; + } + + @Override + public String getPortName() { + return portName; + } + + @Override + public String getPortIdentifier() { + return portIdentifier; + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + @Override + public File getPeerPersistenceFile() { + return peerPersistenceFile; + } + + @Override + public EventReporter getEventReporter() { + return eventReporter; + } + + @Override + public long getPreferredBatchDuration(final TimeUnit timeUnit) { + return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getPreferredBatchSize() { + return batchSize; + } + + @Override + public int getPreferredBatchCount() { + return batchCount; + } + + @Override + public String getKeystoreFilename() { + return keystoreFilename; + } + + @Override + public String getKeystorePassword() { + return keystorePass; + } + + @Override + public KeystoreType getKeystoreType() { + return keystoreType; + } + + @Override + public String getTruststoreFilename() { + return truststoreFilename; + } + + @Override + public String getTruststorePassword() { + return truststorePass; + } + + @Override + public KeystoreType getTruststoreType() { + return truststoreType; + } } } diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java index 4938f20a8e..194a167039 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -16,22 +16,27 @@ */ package org.apache.nifi.remote.client.socket; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.protocol.DataPacket; -import org.apache.nifi.remote.util.StandardDataPacket; -import org.apache.nifi.stream.io.StreamUtils; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - public class TestSiteToSiteClient { @Test @@ -100,4 +105,33 @@ public class TestSiteToSiteClient { } } + @Test + public void testSerialization() { + final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("input") + .buildConfig(); + + final Kryo kryo = new Kryo(); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Output output = new Output(out); + + try { + kryo.writeObject(output, clientConfig); + } finally { + output.close(); + } + + final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + final Input input = new Input(in); + + try { + SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class); + Assert.assertEquals(clientConfig.getUrl(), clientConfig2.getUrl()); + } finally { + input.close(); + } + } + }