NIFI-6760: When writing/reading the length of a DataFrame, do so usin… (#3801)

* NIFI-6760: When writing/reading the length of a DataFrame, do so using a 4-byte integer instead of a 2-byte short. When using uncompressed data, we know that the length of the DataFrame will be no more than 64 KB so a 2-byte short is sufficient. However, if data is compresed, there's a chance that the compressed form of the data will be larger than the uncompressed form (for example, with random binary data or with encrypted data). In this situation, we can end up overflowing the 2-byte short, which causes the length that is written to be wrong. Using a 4-byte integer avoids this situation.

* NIFI-6760: Fixed unit tests
This commit is contained in:
markap14 2019-10-10 10:19:21 -04:00 committed by Bryan Bende
parent 4e8dd6557f
commit d64f4fa942
4 changed files with 11 additions and 12 deletions

View File

@ -319,16 +319,15 @@ public class LoadBalanceSession {
final byte[] compressed = compressDataFrame(byteBuffer, bytesRead);
final int compressedMaxLen = compressed.length;
buffer = ByteBuffer.allocate(3 + compressedMaxLen);
buffer = ByteBuffer.allocate(5 + compressedMaxLen);
buffer.put((byte) LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
buffer.putShort((short) compressedMaxLen);
buffer.putInt(compressedMaxLen);
buffer.put(compressed, 0, compressedMaxLen);
} else {
buffer = ByteBuffer.allocate(3 + bytesRead);
buffer = ByteBuffer.allocate(5 + bytesRead);
buffer.put((byte) LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
buffer.putShort((short) bytesRead);
buffer.putInt(bytesRead);
buffer.put(byteBuffer, 0, bytesRead);
}

View File

@ -499,7 +499,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
}
int dataFrameLength = in.readUnsignedShort();
int dataFrameLength = in.readInt();
logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription);
byte[] buffer = getDataBuffer();
@ -535,7 +535,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
}
dataFrameLength = in.readUnsignedShort();
dataFrameLength = in.readInt();
logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription);
}

View File

@ -157,7 +157,7 @@ public class TestLoadBalanceSession {
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeShort(5);
expectedDos.writeInt(5);
expectedDos.write("hello".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
@ -171,7 +171,7 @@ public class TestLoadBalanceSession {
expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile2.getEntryDate()); // entry date
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeShort(8);
expectedDos.writeInt(8);
expectedDos.write("good-bye".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
@ -246,12 +246,12 @@ public class TestLoadBalanceSession {
// first data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeShort(LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.writeInt(LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.write(Arrays.copyOfRange(content, 0, LoadBalanceSession.MAX_DATA_FRAME_SIZE));
// second data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeShort(content.length - LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.writeInt(content.length - LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.write(Arrays.copyOfRange(content, LoadBalanceSession.MAX_DATA_FRAME_SIZE, content.length));
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);

View File

@ -655,7 +655,7 @@ public class TestStandardLoadBalanceProtocol {
final int length = Math.min(content.length - offset, 65535);
out.write(DATA_FRAME_FOLLOWS);
out.writeShort(length);
out.writeInt(length);
out.write(content, offset, length);
}