HADOOP-17945. JsonSerialization raises EOFException reading JSON data stored on google GCS (#3501)

Contributed By: Steve Loughran
This commit is contained in:
Steve Loughran 2021-10-19 11:03:37 +01:00 committed by GitHub
parent cb8c98fbb0
commit 2194b9714e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 20 deletions

View File

@ -64,7 +64,13 @@ public class PathIOException extends IOException {
this.path = path; this.path = path;
} }
protected PathIOException(String path, String error, Throwable cause) { /**
* Use a subclass of PathIOException if possible.
* @param path for the exception
* @param error custom string to use an the error text
* @param cause cause of exception.
*/
public PathIOException(String path, String error, Throwable cause) {
super(error, cause); super(error, cause);
this.path = path; this.path = path;
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.util; package org.apache.hadoop.util;
import javax.annotation.Nullable;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -42,8 +43,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/** /**
* Support for marshalling objects to and from JSON. * Support for marshalling objects to and from JSON.
@ -228,30 +234,44 @@ public class JsonSerialization<T> {
/** /**
* Load from a Hadoop filesystem. * Load from a Hadoop filesystem.
* There's a check for data availability after the file is open, by
* raising an EOFException if stream.available == 0.
* This allows for a meaningful exception without the round trip overhead
* of a getFileStatus call before opening the file. It may be brittle
* against an FS stream which doesn't return a value here, but the
* standard filesystems all do.
* JSON parsing and mapping problems
* are converted to IOEs.
* @param fs filesystem * @param fs filesystem
* @param path path * @param path path
* @return a loaded object * @return a loaded object
* @throws IOException IO or JSON parse problems * @throws PathIOException JSON parse problem
* @throws IOException IO problems
*/ */
public T load(FileSystem fs, Path path) throws IOException { public T load(FileSystem fs, Path path) throws IOException {
try (FSDataInputStream dataInputStream = fs.open(path)) { return load(fs, path, null);
// throw an EOF exception if there is no data available. }
if (dataInputStream.available() == 0) {
/**
* Load from a Hadoop filesystem.
* If a file status is supplied, it's passed in to the openFile()
* call so that FS implementations can optimize their opening.
* @param fs filesystem
* @param path path
* @param status status of the file to open.
* @return a loaded object
* @throws PathIOException JSON parse problem
* @throws EOFException file status references an empty file
* @throws IOException IO problems
*/
public T load(FileSystem fs, Path path, @Nullable FileStatus status)
throws IOException {
if (status != null && status.getLen() == 0) {
throw new EOFException("No data in " + path); throw new EOFException("No data in " + path);
} }
FutureDataInputStreamBuilder builder = fs.openFile(path);
if (status != null) {
builder.withFileStatus(status);
}
try (FSDataInputStream dataInputStream =
awaitFuture(builder.build())) {
return fromJsonStream(dataInputStream); return fromJsonStream(dataInputStream);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IOException( throw new PathIOException(path.toString(),
String.format("Failed to read JSON file \"%s\": %s", path, e), "Failed to read JSON file " + e, e);
e);
} }
} }

View File

@ -28,9 +28,11 @@ import com.fasterxml.jackson.core.JsonParseException;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.test.HadoopTestBase; import org.apache.hadoop.test.HadoopTestBase;
import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.LambdaTestUtils;
@ -151,6 +153,9 @@ public class TestJsonSerialization extends HadoopTestBase {
} }
} }
/**
* round trip through both load APIs.
*/
@Test @Test
public void testFileSystemRoundTrip() throws Throwable { public void testFileSystemRoundTrip() throws Throwable {
File tempFile = File.createTempFile("Keyval", ".json"); File tempFile = File.createTempFile("Keyval", ".json");
@ -159,19 +164,30 @@ public class TestJsonSerialization extends HadoopTestBase {
LocalFileSystem fs = FileSystem.getLocal(new Configuration()); LocalFileSystem fs = FileSystem.getLocal(new Configuration());
try { try {
serDeser.save(fs, tempPath, source, false); serDeser.save(fs, tempPath, source, false);
assertEquals(source, serDeser.load(fs, tempPath)); assertEquals("JSON loaded with load(fs, path)",
source,
serDeser.load(fs, tempPath));
assertEquals("JSON loaded with load(fs, path, status)",
source,
serDeser.load(fs, tempPath, fs.getFileStatus(tempPath)));
} finally { } finally {
fs.delete(tempPath, false); fs.delete(tempPath, false);
} }
} }
/**
* 0 byte file through the load(path) API will fail with a wrapped
* Parser exception.
* 0 byte file through the load(path, status) API will fail with a wrapped
* Parser exception.
*/
@Test @Test
public void testFileSystemEmptyPath() throws Throwable { public void testFileSystemEmptyPath() throws Throwable {
File tempFile = File.createTempFile("Keyval", ".json"); File tempFile = File.createTempFile("Keyval", ".json");
Path tempPath = new Path(tempFile.toURI()); Path tempPath = new Path(tempFile.toURI());
LocalFileSystem fs = FileSystem.getLocal(new Configuration()); LocalFileSystem fs = FileSystem.getLocal(new Configuration());
try { try {
LambdaTestUtils.intercept(EOFException.class, LambdaTestUtils.intercept(PathIOException.class,
() -> serDeser.load(fs, tempPath)); () -> serDeser.load(fs, tempPath));
fs.delete(tempPath, false); fs.delete(tempPath, false);
LambdaTestUtils.intercept(FileNotFoundException.class, LambdaTestUtils.intercept(FileNotFoundException.class,
@ -181,5 +197,23 @@ public class TestJsonSerialization extends HadoopTestBase {
} }
} }
/**
* 0 byte file through the load(path, status) API will fail with an
* EOFException.
*/
@Test
public void testFileSystemEmptyStatus() throws Throwable {
File tempFile = File.createTempFile("Keyval", ".json");
Path tempPath = new Path(tempFile.toURI());
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
try {
final FileStatus st = fs.getFileStatus(tempPath);
LambdaTestUtils.intercept(EOFException.class,
() -> serDeser.load(fs, tempPath, st));
} finally {
fs.delete(tempPath, false);
}
}
} }