mirror of https://github.com/apache/nifi.git
NIFI-4208: Fixed bug in SchemaRepositoryRecordSerde that would return null from deserializeEdit if there was no data; the interface documents that null cannot be returned from this method, as it is called only when data is expected to exist. As a result, if there is no data, we should throw EOFException instead, and the write-ahead log will handle that appropriately.
This closes #2086. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
a1706d12f5
commit
abca9d1464
|
@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -105,7 +106,16 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
|
public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
|
||||||
return deserializeRecord(in, version);
|
final RepositoryRecord record = deserializeRecord(in, version);
|
||||||
|
if (record != null) {
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
|
// deserializeRecord may return a null if there is no more data. However, when we are deserializing
|
||||||
|
// an edit, we do so only when we know that we should have data. This is why the JavaDocs for this method
|
||||||
|
// on the interface indicate that this method should never return null. As a result, if there is no data
|
||||||
|
// available, we handle this by throwing an EOFException.
|
||||||
|
throw new EOFException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
|
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
|
||||||
import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
|
import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -35,6 +37,7 @@ import java.util.Map;
|
||||||
import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN;
|
import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -244,6 +247,29 @@ public class SchemaRepositoryRecordSerdeTest {
|
||||||
assertEquals(SWAP_IN, repositoryRecord.getType());
|
assertEquals(SWAP_IN, repositoryRecord.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEOFExceptionOnDeserializeEdit() throws IOException {
|
||||||
|
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
|
||||||
|
|
||||||
|
DataInputStream dataInputStream = createDataInputStream();
|
||||||
|
schemaRepositoryRecordSerde.readHeader(dataInputStream);
|
||||||
|
|
||||||
|
// calling deserializeRecord on an empty stream should return a null record.
|
||||||
|
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
|
||||||
|
assertNull(repositoryRecord);
|
||||||
|
|
||||||
|
dataInputStream = createDataInputStream();
|
||||||
|
schemaRepositoryRecordSerde.readHeader(dataInputStream);
|
||||||
|
|
||||||
|
// calling deserializeEdit on an empty stream should throw EOFException
|
||||||
|
try {
|
||||||
|
schemaRepositoryRecordSerde.deserializeEdit(dataInputStream, new HashMap<>(), 2);
|
||||||
|
Assert.fail("Expected EOFException");
|
||||||
|
} catch (final EOFException eof) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private DataInputStream createDataInputStream() throws IOException {
|
private DataInputStream createDataInputStream() throws IOException {
|
||||||
dataOutputStream.flush();
|
dataOutputStream.flush();
|
||||||
return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
|
return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
|
||||||
|
|
Loading…
Reference in New Issue