NIFI-6886: Fixed SiteToSiteReportingRecordSink, refactored mocks

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3959.
This commit is contained in:
Matthew Burgess 2020-01-06 13:13:31 -05:00 committed by Pierre Villard
parent 1ee6dba00a
commit 9ed4623817
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
6 changed files with 61 additions and 63 deletions

View File

@ -117,7 +117,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
}
public void setup(final ReportingContext reportContext) throws IOException {
if (siteToSiteClient != null) {
if (siteToSiteClient == null) {
siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger());
}
}
@ -127,6 +127,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
final SiteToSiteClient client = getClient();
if (client != null) {
client.close();
siteToSiteClient = null;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
@ -61,6 +62,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
private List<PropertyDescriptor> properties;
private volatile SiteToSiteClient siteToSiteClient;
private volatile RecordSetWriterFactory writerFactory;
private volatile StateManager stateManager;
@Override
protected void init(final ControllerServiceInitializationContext context) {
@ -79,6 +81,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME);
properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD);
this.properties = Collections.unmodifiableList(properties);
this.stateManager = context.getStateManager();
}
@Override
@ -118,6 +121,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
.useCompression(context.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
.eventReporter(eventReporter)
.sslContext(sslContext)
.stateManager(stateManager)
.timeout(context.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(mode)
.httpProxy(httpProxy)

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -183,29 +184,25 @@ public class TestSiteToSiteBulletinReportingTask {
final List<byte[]> dataSent = new ArrayList<>();
@Override
protected SiteToSiteClient getClient() {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
public void setup(ReportingContext reportContext) throws IOException {
if(siteToSiteClient == null) {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
try {
Mockito.doAnswer((Answer<Object>) invocation -> {
final byte[] data = invocation.getArgument(0, byte[].class);
dataSent.add(data);
return null;
}
}).when(transaction).send(Mockito.any(byte[].class), Mockito.anyMap());
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
siteToSiteClient = client;
}
return client;
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -295,27 +296,25 @@ public class TestSiteToSiteMetricsReportingTask {
final List<byte[]> dataSent = new ArrayList<>();
@Override
protected SiteToSiteClient getClient() {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
public void setup(ReportingContext reportContext) throws IOException {
if(siteToSiteClient == null) {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
try {
Mockito.doAnswer((Answer<Object>) invocation -> {
final byte[] data = invocation.getArgument(0, byte[].class);
dataSent.add(data);
return null;
}
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
siteToSiteClient = client;
}
return client;
}
}

View File

@ -677,27 +677,25 @@ public class TestSiteToSiteProvenanceReportingTask {
final List<byte[]> dataSent = new ArrayList<>();
@Override
protected SiteToSiteClient getClient() {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
public void setup(ReportingContext reportContext) throws IOException {
if(siteToSiteClient == null) {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
try {
Mockito.doAnswer((Answer<Object>) invocation -> {
final byte[] data = invocation.getArgument(0, byte[].class);
dataSent.add(data);
return null;
}
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
siteToSiteClient = client;
}
return client;
}
public List<byte[]> getDataSent() {

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -532,27 +533,25 @@ public class TestSiteToSiteStatusReportingTask {
final List<byte[]> dataSent = new ArrayList<>();
@Override
protected SiteToSiteClient getClient() {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
public void setup(ReportingContext reportContext) throws IOException {
if(siteToSiteClient == null) {
final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
final Transaction transaction = Mockito.mock(Transaction.class);
try {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
try {
Mockito.doAnswer((Answer<Object>) invocation -> {
final byte[] data = invocation.getArgument(0, byte[].class);
dataSent.add(data);
return null;
}
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
}).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
siteToSiteClient = client;
}
return client;
}
public List<byte[]> getDataSent() {