NIFI-10630 Updated TestWaitNotifyProtocol comparing JSON Objects

This closes #6526

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
BoyuLi4 2022-10-13 19:56:15 -05:00 committed by exceptionfactory
parent 0f8a04f8d9
commit 56d26be3ca
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 16 additions and 15 deletions

View File

@ -36,6 +36,7 @@ import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.apache.nifi.processors.standard.WaitNotifyProtocol.CONSUMED_COUNT_NAME;
import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
@ -52,6 +53,7 @@ import static org.mockito.Mockito.mock;
public class TestWaitNotifyProtocol {
private final Map<String, AtomicCacheEntry<String, String, Long>> cacheEntries = new HashMap<>();
private final ObjectMapper mapper = new ObjectMapper();
private AtomicDistributedMapCacheClient<Long> cache;
@SuppressWarnings("unchecked")
@ -105,7 +107,7 @@ public class TestWaitNotifyProtocol {
final AtomicCacheEntry<String, String, Long> cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
assertValueEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
}
@Test
@ -122,21 +124,18 @@ public class TestWaitNotifyProtocol {
AtomicCacheEntry<String, String, Long> cacheEntry = cacheEntries.get("signal-id");
assertEquals(2, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
assertValueEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "a", 10, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(3, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
assertValueEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "b", 2, null);
protocol.notify(signalId, "c", 3, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(5, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
assertValueEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, Integer> deltas = new HashMap<>();
deltas.put("a", 10);
deltas.put("b", 25);
@ -144,13 +143,12 @@ public class TestWaitNotifyProtocol {
cacheEntry = cacheEntries.get("signal-id");
assertEquals(6, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
assertValueEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
// Zero clear 'b'.
protocol.notify("signal-id", "b", 0, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(7, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
assertValueEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
}
@ -170,7 +168,7 @@ public class TestWaitNotifyProtocol {
AtomicCacheEntry<String, String, Long> cacheEntry = cacheEntries.get("signal-id");
assertEquals(1L, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue());
assertValueEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, String> attributeA2 = new HashMap<>();
attributeA2.put("p2", "a2"); // Update p2
@ -181,9 +179,7 @@ public class TestWaitNotifyProtocol {
cacheEntry = cacheEntries.get("signal-id");
assertEquals(2L, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue(),
"Updated attributes should be merged correctly");
assertValueEquals("{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue());
}
@Test
@ -441,4 +437,9 @@ public class TestWaitNotifyProtocol {
assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
}
}
public void assertValueEquals(String expected, String value) throws Exception{
assertEquals(mapper.readTree(expected),mapper.readTree(value));
}
}