mirror of https://github.com/apache/nifi.git
NIFI-1284 Creating inner class for SiteToSiteClientConfig to fix serialization issue
This commit is contained in:
parent
bd4f31a4c2
commit
4249fc943a
|
@ -50,5 +50,11 @@
|
|||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.esotericsoftware.kryo</groupId>
|
||||
<artifactId>kryo</artifactId>
|
||||
<version>2.24.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -513,104 +513,7 @@ public interface SiteToSiteClient extends Closeable {
|
|||
* but does not create a SiteToSiteClient
|
||||
*/
|
||||
public SiteToSiteClientConfig buildConfig() {
|
||||
return new SiteToSiteClientConfig() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public boolean isUseCompression() {
|
||||
return Builder.this.isUseCompression();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUrl() {
|
||||
return Builder.this.getUrl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimeout(final TimeUnit timeUnit) {
|
||||
return Builder.this.getTimeout(timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
|
||||
return Builder.this.getIdleConnectionExpiration(timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SSLContext getSslContext() {
|
||||
return Builder.this.getSslContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPortName() {
|
||||
return Builder.this.getPortName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPortIdentifier() {
|
||||
return Builder.this.getPortIdentifier();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPenalizationPeriod(final TimeUnit timeUnit) {
|
||||
return Builder.this.getPenalizationPeriod(timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getPeerPersistenceFile() {
|
||||
return Builder.this.getPeerPersistenceFile();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventReporter getEventReporter() {
|
||||
return Builder.this.getEventReporter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreferredBatchDuration(final TimeUnit timeUnit) {
|
||||
return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreferredBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPreferredBatchCount() {
|
||||
return batchCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeystoreFilename() {
|
||||
return keystoreFilename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeystorePassword() {
|
||||
return keystorePass;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeystoreType getKeystoreType() {
|
||||
return keystoreType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTruststoreFilename() {
|
||||
return truststoreFilename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTruststorePassword() {
|
||||
return truststorePass;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeystoreType getTruststoreType() {
|
||||
return truststoreType;
|
||||
}
|
||||
};
|
||||
return new StandardSiteToSiteClientConfig(this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -764,8 +667,168 @@ public interface SiteToSiteClient extends Closeable {
|
|||
}
|
||||
|
||||
|
||||
public abstract class SerializableSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable {
|
||||
class StandardSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final String url;
|
||||
private final long timeoutNanos;
|
||||
private final long penalizationNanos;
|
||||
private final long idleExpirationNanos;
|
||||
private final SSLContext sslContext;
|
||||
private final String keystoreFilename;
|
||||
private final String keystorePass;
|
||||
private final KeystoreType keystoreType;
|
||||
private final String truststoreFilename;
|
||||
private final String truststorePass;
|
||||
private final KeystoreType truststoreType;
|
||||
private final EventReporter eventReporter;
|
||||
private final File peerPersistenceFile;
|
||||
private final boolean useCompression;
|
||||
private final String portName;
|
||||
private final String portIdentifier;
|
||||
private final int batchCount;
|
||||
private final long batchSize;
|
||||
private final long batchNanos;
|
||||
|
||||
// some serialization frameworks require a default constructor
|
||||
private StandardSiteToSiteClientConfig() {
|
||||
this.url = null;
|
||||
this.timeoutNanos = 0;
|
||||
this.penalizationNanos = 0;
|
||||
this.idleExpirationNanos = 0;
|
||||
this.sslContext = null;
|
||||
this.keystoreFilename = null;
|
||||
this.keystorePass = null;
|
||||
this.keystoreType = null;
|
||||
this.truststoreFilename = null;
|
||||
this.truststorePass = null;
|
||||
this.truststoreType = null;
|
||||
this.eventReporter = null;
|
||||
this.peerPersistenceFile = null;
|
||||
this.useCompression = false;
|
||||
this.portName = null;
|
||||
this.portIdentifier = null;
|
||||
this.batchCount = 0;
|
||||
this.batchSize = 0;
|
||||
this.batchNanos = 0;
|
||||
}
|
||||
|
||||
private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
|
||||
this.url = builder.url;
|
||||
this.timeoutNanos = builder.timeoutNanos;
|
||||
this.penalizationNanos = builder.penalizationNanos;
|
||||
this.idleExpirationNanos = builder.idleExpirationNanos;
|
||||
this.sslContext = builder.sslContext;
|
||||
this.keystoreFilename = builder.keystoreFilename;
|
||||
this.keystorePass = builder.keystorePass;
|
||||
this.keystoreType = builder.keystoreType;
|
||||
this.truststoreFilename = builder.truststoreFilename;
|
||||
this.truststorePass = builder.truststorePass;
|
||||
this.truststoreType = builder.truststoreType;
|
||||
this.eventReporter = builder.eventReporter;
|
||||
this.peerPersistenceFile = builder.peerPersistenceFile;
|
||||
this.useCompression = builder.useCompression;
|
||||
this.portName = builder.portName;
|
||||
this.portIdentifier = builder.portIdentifier;
|
||||
this.batchCount = builder.batchCount;
|
||||
this.batchSize = builder.batchSize;
|
||||
this.batchNanos = builder.batchNanos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUseCompression() {
|
||||
return useCompression;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimeout(final TimeUnit timeUnit) {
|
||||
return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
|
||||
return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SSLContext getSslContext() {
|
||||
return sslContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPortName() {
|
||||
return portName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPortIdentifier() {
|
||||
return portIdentifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPenalizationPeriod(final TimeUnit timeUnit) {
|
||||
return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getPeerPersistenceFile() {
|
||||
return peerPersistenceFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventReporter getEventReporter() {
|
||||
return eventReporter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreferredBatchDuration(final TimeUnit timeUnit) {
|
||||
return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreferredBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPreferredBatchCount() {
|
||||
return batchCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeystoreFilename() {
|
||||
return keystoreFilename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeystorePassword() {
|
||||
return keystorePass;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeystoreType getKeystoreType() {
|
||||
return keystoreType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTruststoreFilename() {
|
||||
return truststoreFilename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTruststorePassword() {
|
||||
return truststorePass;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeystoreType getTruststoreType() {
|
||||
return truststoreType;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,22 +16,27 @@
|
|||
*/
|
||||
package org.apache.nifi.remote.client.socket;
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo;
|
||||
import com.esotericsoftware.kryo.io.Input;
|
||||
import com.esotericsoftware.kryo.io.Output;
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
import org.apache.nifi.remote.util.StandardDataPacket;
|
||||
import org.apache.nifi.stream.io.ByteArrayOutputStream;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
import org.apache.nifi.remote.util.StandardDataPacket;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSiteToSiteClient {
|
||||
|
||||
@Test
|
||||
|
@ -100,4 +105,33 @@ public class TestSiteToSiteClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerialization() {
|
||||
final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
|
||||
.url("http://localhost:8080/nifi")
|
||||
.portName("input")
|
||||
.buildConfig();
|
||||
|
||||
final Kryo kryo = new Kryo();
|
||||
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
final Output output = new Output(out);
|
||||
|
||||
try {
|
||||
kryo.writeObject(output, clientConfig);
|
||||
} finally {
|
||||
output.close();
|
||||
}
|
||||
|
||||
final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
|
||||
final Input input = new Input(in);
|
||||
|
||||
try {
|
||||
SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class);
|
||||
Assert.assertEquals(clientConfig.getUrl(), clientConfig2.getUrl());
|
||||
} finally {
|
||||
input.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue