diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java index c3496b83c2..0169d072c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java @@ -319,16 +319,15 @@ public class LoadBalanceSession { final byte[] compressed = compressDataFrame(byteBuffer, bytesRead); final int compressedMaxLen = compressed.length; - buffer = ByteBuffer.allocate(3 + compressedMaxLen); + buffer = ByteBuffer.allocate(5 + compressedMaxLen); buffer.put((byte) LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); - buffer.putShort((short) compressedMaxLen); + buffer.putInt(compressedMaxLen); buffer.put(compressed, 0, compressedMaxLen); - } else { - buffer = ByteBuffer.allocate(3 + bytesRead); + buffer = ByteBuffer.allocate(5 + bytesRead); buffer.put((byte) LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); - buffer.putShort((short) bytesRead); + buffer.putInt(bytesRead); buffer.put(byteBuffer, 0, bytesRead); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index d9fdd2a3d2..c29a9ec94a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -499,7 +499,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator); } - int dataFrameLength = in.readUnsignedShort(); + int dataFrameLength = in.readInt(); logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription); byte[] buffer = getDataBuffer(); @@ -535,7 +535,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator); } - dataFrameLength = in.readUnsignedShort(); + dataFrameLength = in.readInt(); logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java index 20b6add055..cfe4e2bdfd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java @@ -157,7 +157,7 @@ public class TestLoadBalanceSession { expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date expectedDos.writeLong(flowFile1.getEntryDate()); // entry date expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); - expectedDos.writeShort(5); + expectedDos.writeInt(5); expectedDos.write("hello".getBytes()); expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME); @@ -171,7 +171,7 @@ public class TestLoadBalanceSession { expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date expectedDos.writeLong(flowFile2.getEntryDate()); // entry date expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); - expectedDos.writeShort(8); + expectedDos.writeInt(8); expectedDos.write("good-bye".getBytes()); expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME); @@ -246,12 +246,12 @@ public class TestLoadBalanceSession { // first data frame expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); - expectedDos.writeShort(LoadBalanceSession.MAX_DATA_FRAME_SIZE); + expectedDos.writeInt(LoadBalanceSession.MAX_DATA_FRAME_SIZE); expectedDos.write(Arrays.copyOfRange(content, 0, LoadBalanceSession.MAX_DATA_FRAME_SIZE)); // second data frame expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); - expectedDos.writeShort(content.length - LoadBalanceSession.MAX_DATA_FRAME_SIZE); + expectedDos.writeInt(content.length - LoadBalanceSession.MAX_DATA_FRAME_SIZE); expectedDos.write(Arrays.copyOfRange(content, LoadBalanceSession.MAX_DATA_FRAME_SIZE, content.length)); expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java index a5de8eea60..57fc7d934f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java @@ -655,7 +655,7 @@ public class TestStandardLoadBalanceProtocol { final int length = Math.min(content.length - offset, 65535); out.write(DATA_FRAME_FOLLOWS); - out.writeShort(length); + out.writeInt(length); out.write(content, offset, length); }