From 89c757999b367b63bb6a2009f713b2b3af7fd905 Mon Sep 17 00:00:00 2001 From: David Handermann Date: Wed, 14 Aug 2024 13:44:21 -0500 Subject: [PATCH] NIFI-13652 Added Range Header Handling for Content Download (#9172) This closes #9172 --- .../nifi/web/NiFiWebApiResourceConfig.java | 2 + .../nifi/web/api/FlowFileQueueResource.java | 36 ++--- .../nifi/web/api/ProvenanceEventResource.java | 65 +++----- .../RangeNotSatisfiableExceptionMapper.java | 45 ++++++ .../nifi/web/api/streaming/ByteRange.java | 55 +++++++ .../streaming/ByteRangeFormatException.java | 27 ++++ .../web/api/streaming/ByteRangeParser.java | 32 ++++ .../streaming/ByteRangeStreamingOutput.java | 90 +++++++++++ .../api/streaming/InputStreamingOutput.java | 48 ++++++ .../RangeNotSatisfiableException.java | 29 ++++ .../streaming/StandardByteRangeParser.java | 83 ++++++++++ .../StreamingOutputResponseBuilder.java | 132 ++++++++++++++++ .../ByteRangeStreamingOutputTest.java | 119 +++++++++++++++ .../StandardByteRangeParserTest.java | 142 ++++++++++++++++++ .../StreamingOutputResponseBuilderTest.java | 70 +++++++++ 15 files changed, 905 insertions(+), 70 deletions(-) create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/RangeNotSatisfiableExceptionMapper.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRange.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeFormatException.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeParser.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutput.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/InputStreamingOutput.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/RangeNotSatisfiableException.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StandardByteRangeParser.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilder.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutputTest.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StandardByteRangeParserTest.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilderTest.java diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java index 2590e73646..bf65c1b723 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java @@ -42,6 +42,7 @@ import org.apache.nifi.web.api.config.NoResponseFromNodesExceptionMapper; import org.apache.nifi.web.api.config.NodeDisconnectionExceptionMapper; import org.apache.nifi.web.api.config.NodeReconnectionExceptionMapper; import org.apache.nifi.web.api.config.NotFoundExceptionMapper; +import org.apache.nifi.web.api.config.RangeNotSatisfiableExceptionMapper; import org.apache.nifi.web.api.config.ResourceNotFoundExceptionMapper; import org.apache.nifi.web.api.config.ThrowableMapper; import org.apache.nifi.web.api.config.UnknownNodeExceptionMapper; @@ -131,6 +132,7 @@ public class NiFiWebApiResourceConfig extends ResourceConfig { register(NoResponseFromNodesExceptionMapper.class); register(NodeDisconnectionExceptionMapper.class); register(NodeReconnectionExceptionMapper.class); + register(RangeNotSatisfiableExceptionMapper.class); register(ResourceNotFoundExceptionMapper.class); register(NotFoundExceptionMapper.class); register(UnknownNodeExceptionMapper.class); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java index a641846e57..f860f7a55b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.web.api; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.URI; import io.swagger.v3.oas.annotations.Operation; @@ -33,13 +30,13 @@ import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; +import jakarta.ws.rs.HeaderParam; import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response.Status; @@ -52,7 +49,6 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.DropRequestDTO; @@ -65,6 +61,7 @@ import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.FlowFileEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.streaming.StreamingOutputResponseBuilder; import org.apache.nifi.web.util.ResponseBuilderUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; @@ -194,7 +191,6 @@ public class FlowFileQueueResource extends ApplicationResource { * @param flowFileUuid The flowfile uuid * @param clusterNodeId The cluster node id * @return The content stream - * @throws InterruptedException if interrupted */ @GET @Consumes(MediaType.WILDCARD) @@ -209,14 +205,20 @@ public class FlowFileQueueResource extends ApplicationResource { ) @ApiResponses( value = { + @ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"), @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), - @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."), + @ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested") } ) public Response downloadFlowFileContent( + @Parameter( + description = "Range of bytes requested" + ) + @HeaderParam("Range") final String rangeHeader, @Parameter( description = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response." ) @@ -257,30 +259,14 @@ public class FlowFileQueueResource extends ApplicationResource { // get the uri of the request final String uri = generateResourceUri("flowfile-queues", connectionId, "flowfiles", flowFileUuid, "content"); - // get an input stream to the content final DownloadableContent content = serviceFacade.getContent(connectionId, flowFileUuid, uri); + final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build()); - // generate a streaming response - final StreamingOutput response = new StreamingOutput() { - @Override - public void write(final OutputStream output) throws IOException, WebApplicationException { - try (InputStream is = content.getContent()) { - // stream the content to the response - StreamUtils.copy(is, output); - - // flush the response - output.flush(); - } - } - }; - - // use the appropriate content type String contentType = content.getType(); if (contentType == null) { contentType = MediaType.APPLICATION_OCTET_STREAM; } - - final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType); + responseBuilder.type(contentType); return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java index ec705ebb18..ff72e78535 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java @@ -28,13 +28,13 @@ import jakarta.servlet.http.HttpServletRequest; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; +import jakarta.ws.rs.HeaderParam; import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; @@ -48,7 +48,6 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.repository.claim.ContentDirection; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; @@ -59,15 +58,13 @@ import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO; import org.apache.nifi.web.api.entity.SubmitReplayRequestEntity; import org.apache.nifi.web.api.request.LongParameter; +import org.apache.nifi.web.api.streaming.StreamingOutputResponseBuilder; import org.apache.nifi.web.util.ResponseBuilderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.URI; import java.util.Collections; @@ -104,14 +101,20 @@ public class ProvenanceEventResource extends ApplicationResource { ) @ApiResponses( value = { + @ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"), @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), - @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."), + @ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested") } ) public Response getInputContent( + @Parameter( + description = "Range of bytes requested" + ) + @HeaderParam("Range") final String rangeHeader, @Parameter( description = "The id of the node where the content exists if clustered." ) @@ -140,30 +143,13 @@ public class ProvenanceEventResource extends ApplicationResource { // get the uri of the request final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "input"); - // get an input stream to the content final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.INPUT); - - // generate a streaming response - final StreamingOutput response = new StreamingOutput() { - @Override - public void write(OutputStream output) throws IOException, WebApplicationException { - try (InputStream is = content.getContent()) { - // stream the content to the response - StreamUtils.copy(is, output); - - // flush the response - output.flush(); - } - } - }; - - // use the appropriate content type + final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build()); String contentType = content.getType(); if (contentType == null) { contentType = MediaType.APPLICATION_OCTET_STREAM; } - - final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType); + responseBuilder.type(contentType); return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build(); } @@ -188,14 +174,20 @@ public class ProvenanceEventResource extends ApplicationResource { ) @ApiResponses( value = { + @ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"), @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), - @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."), + @ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested"), } ) public Response getOutputContent( + @Parameter( + description = "Range of bytes requested" + ) + @HeaderParam("Range") final String rangeHeader, @Parameter( description = "The id of the node where the content exists if clustered." ) @@ -224,30 +216,13 @@ public class ProvenanceEventResource extends ApplicationResource { // get the uri of the request final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "output"); - // get an input stream to the content final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.OUTPUT); - - // generate a streaming response - final StreamingOutput response = new StreamingOutput() { - @Override - public void write(OutputStream output) throws IOException, WebApplicationException { - try (InputStream is = content.getContent()) { - // stream the content to the response - StreamUtils.copy(is, output); - - // flush the response - output.flush(); - } - } - }; - - // use the appropriate content type + final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build()); String contentType = content.getType(); if (contentType == null) { contentType = MediaType.APPLICATION_OCTET_STREAM; } - - final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType); + responseBuilder.type(contentType); return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/RangeNotSatisfiableExceptionMapper.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/RangeNotSatisfiableExceptionMapper.java new file mode 100644 index 0000000000..34e89dc06e --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/RangeNotSatisfiableExceptionMapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.config; + +import jakarta.ws.rs.core.MediaType; +import org.apache.nifi.web.api.streaming.RangeNotSatisfiableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.ext.Provider; + +/** + * Map Range Not Satisfiable Exception to HTTP 416 Responses + */ +@Provider +public class RangeNotSatisfiableExceptionMapper implements ExceptionMapper { + + private static final Logger logger = LoggerFactory.getLogger(RangeNotSatisfiableExceptionMapper.class); + + @Override + public Response toResponse(final RangeNotSatisfiableException exception) { + logger.info("HTTP 416 Range Not Satisfiable: {}", exception.getMessage()); + + return Response.status(Response.Status.REQUESTED_RANGE_NOT_SATISFIABLE) + .entity(exception.getMessage()) + .type(MediaType.TEXT_PLAIN_TYPE) + .build(); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRange.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRange.java new file mode 100644 index 0000000000..b6d95c937c --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRange.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import java.util.Objects; +import java.util.OptionalLong; + +/** + * Range of bytes requested as described in RFC 9110 Section 14.1.2 with optional first and last positions + */ +public class ByteRange { + private final Long firstPosition; + + private final Long lastPosition; + + public ByteRange(final Long firstPosition, final Long lastPosition) { + if (firstPosition == null) { + Objects.requireNonNull(lastPosition, "Last Position required"); + } + this.firstPosition = firstPosition; + this.lastPosition = lastPosition; + } + + /** + * Get first position in byte range which can be empty indicating the last position must be specified + * + * @return First position starting with 0 or empty + */ + public OptionalLong getFirstPosition() { + return firstPosition == null ? OptionalLong.empty() : OptionalLong.of(firstPosition); + } + + /** + * Get last position in byte range which can empty indicating the first position must be specified + * + * @return Last position starting with 0 or empty + */ + public OptionalLong getLastPosition() { + return lastPosition == null ? OptionalLong.empty() : OptionalLong.of(lastPosition); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeFormatException.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeFormatException.java new file mode 100644 index 0000000000..fb4f18cfa7 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeFormatException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +/** + * Byte Range Format Exception indicating invalid units specified in Range Header + */ +public class ByteRangeFormatException extends IllegalArgumentException { + + public ByteRangeFormatException(final String message) { + super(message); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeParser.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeParser.java new file mode 100644 index 0000000000..e2d3610921 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeParser.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import java.util.Optional; + +/** + * HTTP Range Header Parser abstraction supporting byte ranges as described in RFC 9110 Section 14.1.2 + */ +public interface ByteRangeParser { + /** + * Read Byte Range from HTTP Range Header + * + * @param rangeHeader HTTP Range Header + * @return Byte Range or empty when Range Header not provided + */ + Optional readByteRange(String rangeHeader); +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutput.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutput.java new file mode 100644 index 0000000000..2324bfac01 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutput.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.StreamingOutput; +import org.apache.nifi.stream.io.LimitingInputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Objects; +import java.util.OptionalLong; + +/** + * Streaming Output implementation supporting HTTP Range Header with specified first and last byte positions + */ +public class ByteRangeStreamingOutput implements StreamingOutput { + private final InputStream inputStream; + + private final ByteRange byteRange; + + /** + * Byte Range Streaming Output with required arguments + * + * @param inputStream Input Stream to be transferred + * @param byteRange Byte Range containing first and last positions + */ + public ByteRangeStreamingOutput(final InputStream inputStream, final ByteRange byteRange) { + Objects.requireNonNull(inputStream, "Input Stream required"); + this.byteRange = Objects.requireNonNull(byteRange, "Byte Range required"); + + final OptionalLong lastPositionFound = byteRange.getLastPosition(); + if (lastPositionFound.isPresent()) { + final long lastPosition = lastPositionFound.getAsLong(); + + final OptionalLong firstPositionFound = byteRange.getFirstPosition(); + if (firstPositionFound.isPresent()) { + // Handle int-range when last position indicates limited number of bytes + this.inputStream = new LimitingInputStream(inputStream, lastPosition); + } else { + // Handle suffix-range when last position indicates the last number of bytes from the end + this.inputStream = inputStream; + } + } else { + this.inputStream = inputStream; + } + } + + @Override + public void write(final OutputStream outputStream) throws IOException, WebApplicationException { + try (inputStream) { + final OptionalLong firstPositionFound = byteRange.getFirstPosition(); + final OptionalLong lastPositionFound = byteRange.getLastPosition(); + + if (firstPositionFound.isPresent()) { + // Handle int-range with first position specified + final long firstPosition = firstPositionFound.getAsLong(); + try { + inputStream.skipNBytes(firstPosition); + } catch (final EOFException e) { + throw new RangeNotSatisfiableException("First Range Position [%d] not valid".formatted(firstPosition), e); + } + } else if (lastPositionFound.isPresent()) { + // Handle suffix-range for last number of bytes specified + final long lastPosition = lastPositionFound.getAsLong(); + final long available = inputStream.available(); + final long skip = available - lastPosition; + inputStream.skipNBytes(skip); + } + + inputStream.transferTo(outputStream); + } + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/InputStreamingOutput.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/InputStreamingOutput.java new file mode 100644 index 0000000000..ffcb07965e --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/InputStreamingOutput.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.StreamingOutput; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Objects; + +/** + * Streaming Output implementation supporting direct transfer of Input Stream + */ +public class InputStreamingOutput implements StreamingOutput { + private final InputStream inputStream; + + /** + * Streaming Output with required arguments + * + * @param inputStream Input Stream to be transferred + */ + public InputStreamingOutput(final InputStream inputStream) { + this.inputStream = Objects.requireNonNull(inputStream, "Input Stream required"); + } + + @Override + public void write(final OutputStream outputStream) throws IOException, WebApplicationException { + try (inputStream) { + inputStream.transferTo(outputStream); + } + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/RangeNotSatisfiableException.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/RangeNotSatisfiableException.java new file mode 100644 index 0000000000..b290a26e97 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/RangeNotSatisfiableException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import java.io.IOException; + +/** + * Runtime Exception indicating that the requested range is outside the bounds of available content streams + */ +public class RangeNotSatisfiableException extends IOException { + + public RangeNotSatisfiableException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StandardByteRangeParser.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StandardByteRangeParser.java new file mode 100644 index 0000000000..e985ab92df --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StandardByteRangeParser.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Standard implementation of Byte Range Header Parser supporting one range specifier of bytes with int-range or suffix-range values + */ +public class StandardByteRangeParser implements ByteRangeParser { + private static final Pattern BYTE_RANGE_PATTERN = Pattern.compile("^bytes=(0|[1-9][0-9]{0,18})?-(0|[1-9][0-9]{0,18})?$"); + + private static final int FIRST_POSITION_GROUP = 1; + + private static final int LAST_POSITION_GROUP = 2; + + @Override + public Optional readByteRange(final String rangeHeader) { + final ByteRange byteRange; + + if (rangeHeader == null || rangeHeader.isBlank()) { + byteRange = null; + } else { + final Matcher matcher = BYTE_RANGE_PATTERN.matcher(rangeHeader); + if (matcher.matches()) { + final Long firstPosition; + final Long lastPosition; + + final String firstPositionGroup = matcher.group(FIRST_POSITION_GROUP); + final String lastPositionGroup = matcher.group(LAST_POSITION_GROUP); + + if (firstPositionGroup == null) { + if (lastPositionGroup == null) { + throw new ByteRangeFormatException("Range header missing first and last positions"); + } + firstPosition = null; + lastPosition = parsePosition(lastPositionGroup); + } else { + firstPosition = parsePosition(firstPositionGroup); + if (lastPositionGroup == null) { + lastPosition = null; + } else { + lastPosition = parsePosition(lastPositionGroup); + + if (lastPosition < firstPosition) { + throw new ByteRangeFormatException("Range header not valid: last position less than first position"); + } + } + } + + byteRange = new ByteRange(firstPosition, lastPosition); + } else { + throw new ByteRangeFormatException("Range header not valid"); + } + } + + return Optional.ofNullable(byteRange); + } + + private long parsePosition(final String positionGroup) { + try { + return Long.parseLong(positionGroup); + } catch (final NumberFormatException e) { + throw new ByteRangeFormatException("Range header position not valid"); + } + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilder.java new file mode 100644 index 0000000000..6aad5004bd --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilder.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.StreamingOutput; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * HTTP Response Builder for Streaming Output with optional Range header handling + */ +public class StreamingOutputResponseBuilder { + static final String ACCEPT_RANGES_HEADER = "Accept-Ranges"; + + static final String CONTENT_RANGE_HEADER = "Content-Range"; + + private static final String BYTES_UNIT = "bytes"; + + private static final String CONTENT_RANGE_BYTES = "bytes %d-%d/%d"; + + private static final int LAST_POSITION_OFFSET = -1; + + private static final ByteRangeParser byteRangeParser = new StandardByteRangeParser(); + + private final InputStream inputStream; + + private String range; + + private boolean acceptRanges; + + /** + * Streaming Output Response Builder with required Input Stream + * + * @param inputStream Input Stream to be transferred + */ + public StreamingOutputResponseBuilder(final InputStream inputStream) { + this.inputStream = Objects.requireNonNull(inputStream, "Input Stream required"); + } + + /** + * Set HTTP Range header + * + * @param range Range header can be null or empty + * @return Builder + */ + public StreamingOutputResponseBuilder range(final String range) { + this.range = range; + this.acceptRanges = true; + return this; + } + + /** + * Process arguments and prepare HTTP Response Builder + * + * @return Response Builder + */ + public Response.ResponseBuilder build() { + final Response.ResponseBuilder responseBuilder; + + final Optional byteRangeFound = byteRangeParser.readByteRange(range); + if (byteRangeFound.isPresent()) { + final int completeLength = getCompleteLength(); + final ByteRange byteRange = byteRangeFound.get(); + final StreamingOutput streamingOutput = new ByteRangeStreamingOutput(inputStream, byteRange); + responseBuilder = Response.status(Response.Status.PARTIAL_CONTENT).entity(streamingOutput); + + final String contentRange = getContentRange(byteRange, completeLength); + responseBuilder.header(CONTENT_RANGE_HEADER, contentRange); + } else { + final StreamingOutput streamingOutput = new InputStreamingOutput(inputStream); + responseBuilder = Response.ok(streamingOutput); + } + + if (acceptRanges) { + responseBuilder.header(ACCEPT_RANGES_HEADER, BYTES_UNIT); + } + + return responseBuilder; + } + + private int getCompleteLength() { + try { + return inputStream.available(); + } catch (final IOException e) { + throw new UncheckedIOException("Complete Length read failed", e); + } + } + + private String getContentRange(final ByteRange byteRange, final int completeLength) { + final OptionalLong lastPositionFound = byteRange.getLastPosition(); + final OptionalLong firstPositionFound = byteRange.getFirstPosition(); + + final long lastPositionCompleteLength = completeLength - LAST_POSITION_OFFSET; + + final long lastPosition; + if (lastPositionFound.isEmpty()) { + lastPosition = lastPositionCompleteLength; + } else { + final long lastPositionRequested = lastPositionFound.getAsLong(); + lastPosition = Math.min(lastPositionRequested, lastPositionCompleteLength); + } + + final long firstPosition; + if (firstPositionFound.isEmpty()) { + firstPosition = 0; + } else { + firstPosition = firstPositionFound.getAsLong(); + } + + return CONTENT_RANGE_BYTES.formatted(firstPosition, lastPosition, completeLength); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutputTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutputTest.java new file mode 100644 index 0000000000..3d060b847c --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/ByteRangeStreamingOutputTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ByteRangeStreamingOutputTest { + + private static final byte[] INPUT_BYTES = String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);; + + private static final long NOT_SATISFIABLE_LENGTH = 1000; + + @Test + void testWriteRangeZeroToUnspecified() throws IOException { + final ByteRange byteRange = new ByteRange(0L, null); + + final byte[] outputBytes = writeBytes(byteRange); + + assertArrayEquals(INPUT_BYTES, outputBytes); + } + + @Test + void testWriteRangeZeroToOne() throws IOException { + final ByteRange byteRange = new ByteRange(0L, 1L); + + final byte[] outputBytes = writeBytes(byteRange); + + assertEquals(1, outputBytes.length); + + final byte first = outputBytes[0]; + assertEquals(INPUT_BYTES[0], first); + } + + @Test + void testWriteRangeZeroToAvailableLength() throws IOException { + final ByteRange byteRange = new ByteRange(0L, (long) INPUT_BYTES.length); + + final byte[] outputBytes = writeBytes(byteRange); + + assertArrayEquals(INPUT_BYTES, outputBytes); + } + + @Test + void testWriteRangeZeroToMaximumLong() throws IOException { + final ByteRange byteRange = new ByteRange(0L, Long.MAX_VALUE); + + final byte[] outputBytes = writeBytes(byteRange); + + assertArrayEquals(INPUT_BYTES, outputBytes); + } + + @Test + void testWriteRangeOneToTwo() throws IOException { + final ByteRange byteRange = new ByteRange(1L, 2L); + + final byte[] outputBytes = writeBytes(byteRange); + + assertEquals(1, outputBytes.length); + + final byte first = outputBytes[0]; + assertEquals(INPUT_BYTES[1], first); + } + + @Test + void testWriteRangeFirstPositionNotSatisfiable() { + final ByteRange byteRange = new ByteRange(NOT_SATISFIABLE_LENGTH, Long.MAX_VALUE); + + assertThrows(RangeNotSatisfiableException.class, () -> writeBytes(byteRange)); + } + + @Test + void testWriteRangeUnspecifiedToOne() throws IOException { + final ByteRange byteRange = new ByteRange(null, 1L); + + final byte[] outputBytes = writeBytes(byteRange); + + assertEquals(1, outputBytes.length); + + final byte first = outputBytes[0]; + final int lastIndex = INPUT_BYTES.length - 1; + final byte lastInput = INPUT_BYTES[lastIndex]; + assertEquals(lastInput, first); + } + + private byte[] writeBytes(final ByteRange byteRange) throws IOException { + final InputStream inputStream = new ByteArrayInputStream(INPUT_BYTES); + + final ByteRangeStreamingOutput streamingOutput = new ByteRangeStreamingOutput(inputStream, byteRange); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + streamingOutput.write(outputStream); + + return outputStream.toByteArray(); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StandardByteRangeParserTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StandardByteRangeParserTest.java new file mode 100644 index 0000000000..98d65ea1e7 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StandardByteRangeParserTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class StandardByteRangeParserTest { + private static final String EMPTY = ""; + + private static final String INVALID_UNIT = "octets=0-1"; + + private static final String BYTES_RANGE_FORMAT = "bytes=%d-%d"; + + private static final String BYTES_RANGE_LAST_POSITION_EMPTY = "bytes=%d-"; + + private static final String BYTES_RANGE_FIRST_POSITION_EMPTY = "bytes=-%d"; + + private static final String BYTES_RANGE_INVALID_NUMBER = "bytes=0-9988776655443322110"; + + private static final String BYTES_RANGE_MISSING_NUMBERS = "bytes=-"; + + private final StandardByteRangeParser parser = new StandardByteRangeParser(); + + @Test + void testReadByteRangeNull() { + final Optional byteRangeFound = parser.readByteRange(null); + + assertTrue(byteRangeFound.isEmpty()); + } + + @Test + void testReadByteRangeEmpty() { + final Optional byteRangeFound = parser.readByteRange(EMPTY); + + assertTrue(byteRangeFound.isEmpty()); + } + + @Test + void testReadByteRangeUnitNotValid() { + assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(INVALID_UNIT)); + } + + @Test + void testReadByteRangeNumbersNotSpecified() { + assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(BYTES_RANGE_MISSING_NUMBERS)); + } + + @Test + void testReadByteRangeNumberNotValid() { + assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(BYTES_RANGE_INVALID_NUMBER)); + } + + @Test + void testReadByteRangeLastPositionLessThanFirstPosition() { + final String rangeHeader = BYTES_RANGE_FORMAT.formatted(Long.MAX_VALUE, 0); + + assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(rangeHeader)); + } + + @Test + void testReadByteRangeZeroToUnspecified() { + final long firstPosition = 0; + + final String rangeHeader = BYTES_RANGE_LAST_POSITION_EMPTY.formatted(firstPosition); + + final Optional byteRangeFound = parser.readByteRange(rangeHeader); + + assertTrue(byteRangeFound.isPresent()); + + final ByteRange byteRange = byteRangeFound.get(); + + final OptionalLong firstPositionFound = byteRange.getFirstPosition(); + assertTrue(firstPositionFound.isPresent()); + assertEquals(firstPosition, firstPositionFound.getAsLong()); + + final OptionalLong lastPositionFound = byteRange.getLastPosition(); + assertTrue(lastPositionFound.isEmpty()); + } + + @Test + void testReadByteRangeSuffixRangeOne() { + final long lastPosition = 1; + + final String rangeHeader = BYTES_RANGE_FIRST_POSITION_EMPTY.formatted(lastPosition); + + final Optional byteRangeFound = parser.readByteRange(rangeHeader); + + assertTrue(byteRangeFound.isPresent()); + + final ByteRange byteRange = byteRangeFound.get(); + + final OptionalLong firstPositionFound = byteRange.getFirstPosition(); + assertTrue(firstPositionFound.isEmpty()); + + final OptionalLong lastPositionFound = byteRange.getLastPosition(); + assertTrue(lastPositionFound.isPresent()); + assertEquals(lastPosition, lastPositionFound.getAsLong()); + } + + @Test + void testReadByteRangeZeroToMaximumLong() { + final long firstPosition = 0; + final long lastPosition = Long.MAX_VALUE; + + final String rangeHeader = BYTES_RANGE_FORMAT.formatted(firstPosition, lastPosition); + + final Optional byteRangeFound = parser.readByteRange(rangeHeader); + + assertTrue(byteRangeFound.isPresent()); + + final ByteRange byteRange = byteRangeFound.get(); + + final OptionalLong firstPositionFound = byteRange.getFirstPosition(); + assertTrue(firstPositionFound.isPresent()); + assertEquals(firstPosition, firstPositionFound.getAsLong()); + + final OptionalLong lastPositionFound = byteRange.getLastPosition(); + assertTrue(lastPositionFound.isPresent()); + assertEquals(lastPosition, lastPositionFound.getAsLong()); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilderTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilderTest.java new file mode 100644 index 0000000000..8b0481f37b --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/streaming/StreamingOutputResponseBuilderTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.streaming; + +import jakarta.ws.rs.core.Response; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class StreamingOutputResponseBuilderTest { + + private static final byte[] INPUT_BYTES = String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);; + + private static final String RANGE = "bytes=0-%d".formatted(INPUT_BYTES.length); + + private static final String ACCEPT_RANGES_BYTES = "bytes"; + + private static final String CONTENT_RANGE_EXPECTED = "bytes 0-%d/%d".formatted(INPUT_BYTES.length, INPUT_BYTES.length); + + @Test + void testBuildInputStream() { + final InputStream inputStream = new ByteArrayInputStream(INPUT_BYTES); + + final StreamingOutputResponseBuilder builder = new StreamingOutputResponseBuilder(inputStream); + + final Response.ResponseBuilder responseBuilder = builder.build(); + + try (Response response = responseBuilder.build()) { + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + } + } + + @Test + void testBuildRange() { + final InputStream inputStream = new ByteArrayInputStream(INPUT_BYTES); + + final StreamingOutputResponseBuilder builder = new StreamingOutputResponseBuilder(inputStream); + builder.range(RANGE); + + final Response.ResponseBuilder responseBuilder = builder.build(); + + try (Response response = responseBuilder.build()) { + assertEquals(Response.Status.PARTIAL_CONTENT.getStatusCode(), response.getStatus()); + + final String acceptRanges = response.getHeaderString(StreamingOutputResponseBuilder.ACCEPT_RANGES_HEADER); + assertEquals(ACCEPT_RANGES_BYTES, acceptRanges); + + final String contentRange = response.getHeaderString(StreamingOutputResponseBuilder.CONTENT_RANGE_HEADER); + assertEquals(CONTENT_RANGE_EXPECTED, contentRange); + } + } +}