NIFI-13652 Added Range Header Handling for Content Download (#9172)

This closes #9172
This commit is contained in:
David Handermann 2024-08-14 13:44:21 -05:00 committed by GitHub
parent 60e99184f8
commit 89c757999b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 905 additions and 70 deletions

View File

@ -42,6 +42,7 @@ import org.apache.nifi.web.api.config.NoResponseFromNodesExceptionMapper;
import org.apache.nifi.web.api.config.NodeDisconnectionExceptionMapper; import org.apache.nifi.web.api.config.NodeDisconnectionExceptionMapper;
import org.apache.nifi.web.api.config.NodeReconnectionExceptionMapper; import org.apache.nifi.web.api.config.NodeReconnectionExceptionMapper;
import org.apache.nifi.web.api.config.NotFoundExceptionMapper; import org.apache.nifi.web.api.config.NotFoundExceptionMapper;
import org.apache.nifi.web.api.config.RangeNotSatisfiableExceptionMapper;
import org.apache.nifi.web.api.config.ResourceNotFoundExceptionMapper; import org.apache.nifi.web.api.config.ResourceNotFoundExceptionMapper;
import org.apache.nifi.web.api.config.ThrowableMapper; import org.apache.nifi.web.api.config.ThrowableMapper;
import org.apache.nifi.web.api.config.UnknownNodeExceptionMapper; import org.apache.nifi.web.api.config.UnknownNodeExceptionMapper;
@ -131,6 +132,7 @@ public class NiFiWebApiResourceConfig extends ResourceConfig {
register(NoResponseFromNodesExceptionMapper.class); register(NoResponseFromNodesExceptionMapper.class);
register(NodeDisconnectionExceptionMapper.class); register(NodeDisconnectionExceptionMapper.class);
register(NodeReconnectionExceptionMapper.class); register(NodeReconnectionExceptionMapper.class);
register(RangeNotSatisfiableExceptionMapper.class);
register(ResourceNotFoundExceptionMapper.class); register(ResourceNotFoundExceptionMapper.class);
register(NotFoundExceptionMapper.class); register(NotFoundExceptionMapper.class);
register(UnknownNodeExceptionMapper.class); register(UnknownNodeExceptionMapper.class);

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.nifi.web.api; package org.apache.nifi.web.api;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
@ -33,13 +30,13 @@ import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE; import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET; import jakarta.ws.rs.GET;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.POST; import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path; import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam; import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces; import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status; import jakarta.ws.rs.core.Response.Status;
@ -52,7 +49,6 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.DropRequestDTO;
@ -65,6 +61,7 @@ import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowFileEntity; import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.streaming.StreamingOutputResponseBuilder;
import org.apache.nifi.web.util.ResponseBuilderUtils; import org.apache.nifi.web.util.ResponseBuilderUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
@ -194,7 +191,6 @@ public class FlowFileQueueResource extends ApplicationResource {
* @param flowFileUuid The flowfile uuid * @param flowFileUuid The flowfile uuid
* @param clusterNodeId The cluster node id * @param clusterNodeId The cluster node id
* @return The content stream * @return The content stream
* @throws InterruptedException if interrupted
*/ */
@GET @GET
@Consumes(MediaType.WILDCARD) @Consumes(MediaType.WILDCARD)
@ -209,14 +205,20 @@ public class FlowFileQueueResource extends ApplicationResource {
) )
@ApiResponses( @ApiResponses(
value = { value = {
@ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."), @ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."), @ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."),
@ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested")
} }
) )
public Response downloadFlowFileContent( public Response downloadFlowFileContent(
@Parameter(
description = "Range of bytes requested"
)
@HeaderParam("Range") final String rangeHeader,
@Parameter( @Parameter(
description = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response." description = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response."
) )
@ -257,30 +259,14 @@ public class FlowFileQueueResource extends ApplicationResource {
// get the uri of the request // get the uri of the request
final String uri = generateResourceUri("flowfile-queues", connectionId, "flowfiles", flowFileUuid, "content"); final String uri = generateResourceUri("flowfile-queues", connectionId, "flowfiles", flowFileUuid, "content");
// get an input stream to the content
final DownloadableContent content = serviceFacade.getContent(connectionId, flowFileUuid, uri); final DownloadableContent content = serviceFacade.getContent(connectionId, flowFileUuid, uri);
final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build());
// generate a streaming response
final StreamingOutput response = new StreamingOutput() {
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
try (InputStream is = content.getContent()) {
// stream the content to the response
StreamUtils.copy(is, output);
// flush the response
output.flush();
}
}
};
// use the appropriate content type
String contentType = content.getType(); String contentType = content.getType();
if (contentType == null) { if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM; contentType = MediaType.APPLICATION_OCTET_STREAM;
} }
responseBuilder.type(contentType);
final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType);
return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build(); return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build();
} }

View File

@ -28,13 +28,13 @@ import jakarta.servlet.http.HttpServletRequest;
import jakarta.ws.rs.Consumes; import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET; import jakarta.ws.rs.GET;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.POST; import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path; import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam; import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces; import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response;
@ -48,7 +48,6 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
@ -59,15 +58,13 @@ import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO; import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO;
import org.apache.nifi.web.api.entity.SubmitReplayRequestEntity; import org.apache.nifi.web.api.entity.SubmitReplayRequestEntity;
import org.apache.nifi.web.api.request.LongParameter; import org.apache.nifi.web.api.request.LongParameter;
import org.apache.nifi.web.api.streaming.StreamingOutputResponseBuilder;
import org.apache.nifi.web.util.ResponseBuilderUtils; import org.apache.nifi.web.util.ResponseBuilderUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
@ -104,14 +101,20 @@ public class ProvenanceEventResource extends ApplicationResource {
) )
@ApiResponses( @ApiResponses(
value = { value = {
@ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."), @ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."), @ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."),
@ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested")
} }
) )
public Response getInputContent( public Response getInputContent(
@Parameter(
description = "Range of bytes requested"
)
@HeaderParam("Range") final String rangeHeader,
@Parameter( @Parameter(
description = "The id of the node where the content exists if clustered." description = "The id of the node where the content exists if clustered."
) )
@ -140,30 +143,13 @@ public class ProvenanceEventResource extends ApplicationResource {
// get the uri of the request // get the uri of the request
final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "input"); final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "input");
// get an input stream to the content
final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.INPUT); final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.INPUT);
final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build());
// generate a streaming response
final StreamingOutput response = new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
try (InputStream is = content.getContent()) {
// stream the content to the response
StreamUtils.copy(is, output);
// flush the response
output.flush();
}
}
};
// use the appropriate content type
String contentType = content.getType(); String contentType = content.getType();
if (contentType == null) { if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM; contentType = MediaType.APPLICATION_OCTET_STREAM;
} }
responseBuilder.type(contentType);
final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType);
return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build(); return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build();
} }
@ -188,14 +174,20 @@ public class ProvenanceEventResource extends ApplicationResource {
) )
@ApiResponses( @ApiResponses(
value = { value = {
@ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."), @ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."), @ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."),
@ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested"),
} }
) )
public Response getOutputContent( public Response getOutputContent(
@Parameter(
description = "Range of bytes requested"
)
@HeaderParam("Range") final String rangeHeader,
@Parameter( @Parameter(
description = "The id of the node where the content exists if clustered." description = "The id of the node where the content exists if clustered."
) )
@ -224,30 +216,13 @@ public class ProvenanceEventResource extends ApplicationResource {
// get the uri of the request // get the uri of the request
final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "output"); final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "output");
// get an input stream to the content
final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.OUTPUT); final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.OUTPUT);
final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build());
// generate a streaming response
final StreamingOutput response = new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
try (InputStream is = content.getContent()) {
// stream the content to the response
StreamUtils.copy(is, output);
// flush the response
output.flush();
}
}
};
// use the appropriate content type
String contentType = content.getType(); String contentType = content.getType();
if (contentType == null) { if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM; contentType = MediaType.APPLICATION_OCTET_STREAM;
} }
responseBuilder.type(contentType);
final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType);
return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build(); return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build();
} }

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.config;
import jakarta.ws.rs.core.MediaType;
import org.apache.nifi.web.api.streaming.RangeNotSatisfiableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.ExceptionMapper;
import jakarta.ws.rs.ext.Provider;
/**
* Map Range Not Satisfiable Exception to HTTP 416 Responses
*/
@Provider
public class RangeNotSatisfiableExceptionMapper implements ExceptionMapper<RangeNotSatisfiableException> {
private static final Logger logger = LoggerFactory.getLogger(RangeNotSatisfiableExceptionMapper.class);
@Override
public Response toResponse(final RangeNotSatisfiableException exception) {
logger.info("HTTP 416 Range Not Satisfiable: {}", exception.getMessage());
return Response.status(Response.Status.REQUESTED_RANGE_NOT_SATISFIABLE)
.entity(exception.getMessage())
.type(MediaType.TEXT_PLAIN_TYPE)
.build();
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import java.util.Objects;
import java.util.OptionalLong;
/**
* Range of bytes requested as described in RFC 9110 Section 14.1.2 with optional first and last positions
*/
public class ByteRange {
private final Long firstPosition;
private final Long lastPosition;
public ByteRange(final Long firstPosition, final Long lastPosition) {
if (firstPosition == null) {
Objects.requireNonNull(lastPosition, "Last Position required");
}
this.firstPosition = firstPosition;
this.lastPosition = lastPosition;
}
/**
* Get first position in byte range which can be empty indicating the last position must be specified
*
* @return First position starting with 0 or empty
*/
public OptionalLong getFirstPosition() {
return firstPosition == null ? OptionalLong.empty() : OptionalLong.of(firstPosition);
}
/**
* Get last position in byte range which can empty indicating the first position must be specified
*
* @return Last position starting with 0 or empty
*/
public OptionalLong getLastPosition() {
return lastPosition == null ? OptionalLong.empty() : OptionalLong.of(lastPosition);
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
/**
* Byte Range Format Exception indicating invalid units specified in Range Header
*/
public class ByteRangeFormatException extends IllegalArgumentException {
public ByteRangeFormatException(final String message) {
super(message);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import java.util.Optional;
/**
* HTTP Range Header Parser abstraction supporting byte ranges as described in RFC 9110 Section 14.1.2
*/
public interface ByteRangeParser {
/**
* Read Byte Range from HTTP Range Header
*
* @param rangeHeader HTTP Range Header
* @return Byte Range or empty when Range Header not provided
*/
Optional<ByteRange> readByteRange(String rangeHeader);
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.StreamingOutput;
import org.apache.nifi.stream.io.LimitingInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import java.util.OptionalLong;
/**
* Streaming Output implementation supporting HTTP Range Header with specified first and last byte positions
*/
public class ByteRangeStreamingOutput implements StreamingOutput {
private final InputStream inputStream;
private final ByteRange byteRange;
/**
* Byte Range Streaming Output with required arguments
*
* @param inputStream Input Stream to be transferred
* @param byteRange Byte Range containing first and last positions
*/
public ByteRangeStreamingOutput(final InputStream inputStream, final ByteRange byteRange) {
Objects.requireNonNull(inputStream, "Input Stream required");
this.byteRange = Objects.requireNonNull(byteRange, "Byte Range required");
final OptionalLong lastPositionFound = byteRange.getLastPosition();
if (lastPositionFound.isPresent()) {
final long lastPosition = lastPositionFound.getAsLong();
final OptionalLong firstPositionFound = byteRange.getFirstPosition();
if (firstPositionFound.isPresent()) {
// Handle int-range when last position indicates limited number of bytes
this.inputStream = new LimitingInputStream(inputStream, lastPosition);
} else {
// Handle suffix-range when last position indicates the last number of bytes from the end
this.inputStream = inputStream;
}
} else {
this.inputStream = inputStream;
}
}
@Override
public void write(final OutputStream outputStream) throws IOException, WebApplicationException {
try (inputStream) {
final OptionalLong firstPositionFound = byteRange.getFirstPosition();
final OptionalLong lastPositionFound = byteRange.getLastPosition();
if (firstPositionFound.isPresent()) {
// Handle int-range with first position specified
final long firstPosition = firstPositionFound.getAsLong();
try {
inputStream.skipNBytes(firstPosition);
} catch (final EOFException e) {
throw new RangeNotSatisfiableException("First Range Position [%d] not valid".formatted(firstPosition), e);
}
} else if (lastPositionFound.isPresent()) {
// Handle suffix-range for last number of bytes specified
final long lastPosition = lastPositionFound.getAsLong();
final long available = inputStream.available();
final long skip = available - lastPosition;
inputStream.skipNBytes(skip);
}
inputStream.transferTo(outputStream);
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
/**
* Streaming Output implementation supporting direct transfer of Input Stream
*/
public class InputStreamingOutput implements StreamingOutput {
private final InputStream inputStream;
/**
* Streaming Output with required arguments
*
* @param inputStream Input Stream to be transferred
*/
public InputStreamingOutput(final InputStream inputStream) {
this.inputStream = Objects.requireNonNull(inputStream, "Input Stream required");
}
@Override
public void write(final OutputStream outputStream) throws IOException, WebApplicationException {
try (inputStream) {
inputStream.transferTo(outputStream);
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import java.io.IOException;
/**
* Runtime Exception indicating that the requested range is outside the bounds of available content streams
*/
public class RangeNotSatisfiableException extends IOException {
public RangeNotSatisfiableException(final String message, final Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Standard implementation of Byte Range Header Parser supporting one range specifier of bytes with int-range or suffix-range values
*/
public class StandardByteRangeParser implements ByteRangeParser {
private static final Pattern BYTE_RANGE_PATTERN = Pattern.compile("^bytes=(0|[1-9][0-9]{0,18})?-(0|[1-9][0-9]{0,18})?$");
private static final int FIRST_POSITION_GROUP = 1;
private static final int LAST_POSITION_GROUP = 2;
@Override
public Optional<ByteRange> readByteRange(final String rangeHeader) {
final ByteRange byteRange;
if (rangeHeader == null || rangeHeader.isBlank()) {
byteRange = null;
} else {
final Matcher matcher = BYTE_RANGE_PATTERN.matcher(rangeHeader);
if (matcher.matches()) {
final Long firstPosition;
final Long lastPosition;
final String firstPositionGroup = matcher.group(FIRST_POSITION_GROUP);
final String lastPositionGroup = matcher.group(LAST_POSITION_GROUP);
if (firstPositionGroup == null) {
if (lastPositionGroup == null) {
throw new ByteRangeFormatException("Range header missing first and last positions");
}
firstPosition = null;
lastPosition = parsePosition(lastPositionGroup);
} else {
firstPosition = parsePosition(firstPositionGroup);
if (lastPositionGroup == null) {
lastPosition = null;
} else {
lastPosition = parsePosition(lastPositionGroup);
if (lastPosition < firstPosition) {
throw new ByteRangeFormatException("Range header not valid: last position less than first position");
}
}
}
byteRange = new ByteRange(firstPosition, lastPosition);
} else {
throw new ByteRangeFormatException("Range header not valid");
}
}
return Optional.ofNullable(byteRange);
}
private long parsePosition(final String positionGroup) {
try {
return Long.parseLong(positionGroup);
} catch (final NumberFormatException e) {
throw new ByteRangeFormatException("Range header position not valid");
}
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
/**
* HTTP Response Builder for Streaming Output with optional Range header handling
*/
public class StreamingOutputResponseBuilder {
static final String ACCEPT_RANGES_HEADER = "Accept-Ranges";
static final String CONTENT_RANGE_HEADER = "Content-Range";
private static final String BYTES_UNIT = "bytes";
private static final String CONTENT_RANGE_BYTES = "bytes %d-%d/%d";
private static final int LAST_POSITION_OFFSET = -1;
private static final ByteRangeParser byteRangeParser = new StandardByteRangeParser();
private final InputStream inputStream;
private String range;
private boolean acceptRanges;
/**
* Streaming Output Response Builder with required Input Stream
*
* @param inputStream Input Stream to be transferred
*/
public StreamingOutputResponseBuilder(final InputStream inputStream) {
this.inputStream = Objects.requireNonNull(inputStream, "Input Stream required");
}
/**
* Set HTTP Range header
*
* @param range Range header can be null or empty
* @return Builder
*/
public StreamingOutputResponseBuilder range(final String range) {
this.range = range;
this.acceptRanges = true;
return this;
}
/**
* Process arguments and prepare HTTP Response Builder
*
* @return Response Builder
*/
public Response.ResponseBuilder build() {
final Response.ResponseBuilder responseBuilder;
final Optional<ByteRange> byteRangeFound = byteRangeParser.readByteRange(range);
if (byteRangeFound.isPresent()) {
final int completeLength = getCompleteLength();
final ByteRange byteRange = byteRangeFound.get();
final StreamingOutput streamingOutput = new ByteRangeStreamingOutput(inputStream, byteRange);
responseBuilder = Response.status(Response.Status.PARTIAL_CONTENT).entity(streamingOutput);
final String contentRange = getContentRange(byteRange, completeLength);
responseBuilder.header(CONTENT_RANGE_HEADER, contentRange);
} else {
final StreamingOutput streamingOutput = new InputStreamingOutput(inputStream);
responseBuilder = Response.ok(streamingOutput);
}
if (acceptRanges) {
responseBuilder.header(ACCEPT_RANGES_HEADER, BYTES_UNIT);
}
return responseBuilder;
}
private int getCompleteLength() {
try {
return inputStream.available();
} catch (final IOException e) {
throw new UncheckedIOException("Complete Length read failed", e);
}
}
private String getContentRange(final ByteRange byteRange, final int completeLength) {
final OptionalLong lastPositionFound = byteRange.getLastPosition();
final OptionalLong firstPositionFound = byteRange.getFirstPosition();
final long lastPositionCompleteLength = completeLength - LAST_POSITION_OFFSET;
final long lastPosition;
if (lastPositionFound.isEmpty()) {
lastPosition = lastPositionCompleteLength;
} else {
final long lastPositionRequested = lastPositionFound.getAsLong();
lastPosition = Math.min(lastPositionRequested, lastPositionCompleteLength);
}
final long firstPosition;
if (firstPositionFound.isEmpty()) {
firstPosition = 0;
} else {
firstPosition = firstPositionFound.getAsLong();
}
return CONTENT_RANGE_BYTES.formatted(firstPosition, lastPosition, completeLength);
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
class ByteRangeStreamingOutputTest {
private static final byte[] INPUT_BYTES = String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);;
private static final long NOT_SATISFIABLE_LENGTH = 1000;
@Test
void testWriteRangeZeroToUnspecified() throws IOException {
final ByteRange byteRange = new ByteRange(0L, null);
final byte[] outputBytes = writeBytes(byteRange);
assertArrayEquals(INPUT_BYTES, outputBytes);
}
@Test
void testWriteRangeZeroToOne() throws IOException {
final ByteRange byteRange = new ByteRange(0L, 1L);
final byte[] outputBytes = writeBytes(byteRange);
assertEquals(1, outputBytes.length);
final byte first = outputBytes[0];
assertEquals(INPUT_BYTES[0], first);
}
@Test
void testWriteRangeZeroToAvailableLength() throws IOException {
final ByteRange byteRange = new ByteRange(0L, (long) INPUT_BYTES.length);
final byte[] outputBytes = writeBytes(byteRange);
assertArrayEquals(INPUT_BYTES, outputBytes);
}
@Test
void testWriteRangeZeroToMaximumLong() throws IOException {
final ByteRange byteRange = new ByteRange(0L, Long.MAX_VALUE);
final byte[] outputBytes = writeBytes(byteRange);
assertArrayEquals(INPUT_BYTES, outputBytes);
}
@Test
void testWriteRangeOneToTwo() throws IOException {
final ByteRange byteRange = new ByteRange(1L, 2L);
final byte[] outputBytes = writeBytes(byteRange);
assertEquals(1, outputBytes.length);
final byte first = outputBytes[0];
assertEquals(INPUT_BYTES[1], first);
}
@Test
void testWriteRangeFirstPositionNotSatisfiable() {
final ByteRange byteRange = new ByteRange(NOT_SATISFIABLE_LENGTH, Long.MAX_VALUE);
assertThrows(RangeNotSatisfiableException.class, () -> writeBytes(byteRange));
}
@Test
void testWriteRangeUnspecifiedToOne() throws IOException {
final ByteRange byteRange = new ByteRange(null, 1L);
final byte[] outputBytes = writeBytes(byteRange);
assertEquals(1, outputBytes.length);
final byte first = outputBytes[0];
final int lastIndex = INPUT_BYTES.length - 1;
final byte lastInput = INPUT_BYTES[lastIndex];
assertEquals(lastInput, first);
}
private byte[] writeBytes(final ByteRange byteRange) throws IOException {
final InputStream inputStream = new ByteArrayInputStream(INPUT_BYTES);
final ByteRangeStreamingOutput streamingOutput = new ByteRangeStreamingOutput(inputStream, byteRange);
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
streamingOutput.write(outputStream);
return outputStream.toByteArray();
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.OptionalLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class StandardByteRangeParserTest {
private static final String EMPTY = "";
private static final String INVALID_UNIT = "octets=0-1";
private static final String BYTES_RANGE_FORMAT = "bytes=%d-%d";
private static final String BYTES_RANGE_LAST_POSITION_EMPTY = "bytes=%d-";
private static final String BYTES_RANGE_FIRST_POSITION_EMPTY = "bytes=-%d";
private static final String BYTES_RANGE_INVALID_NUMBER = "bytes=0-9988776655443322110";
private static final String BYTES_RANGE_MISSING_NUMBERS = "bytes=-";
private final StandardByteRangeParser parser = new StandardByteRangeParser();
@Test
void testReadByteRangeNull() {
final Optional<ByteRange> byteRangeFound = parser.readByteRange(null);
assertTrue(byteRangeFound.isEmpty());
}
@Test
void testReadByteRangeEmpty() {
final Optional<ByteRange> byteRangeFound = parser.readByteRange(EMPTY);
assertTrue(byteRangeFound.isEmpty());
}
@Test
void testReadByteRangeUnitNotValid() {
assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(INVALID_UNIT));
}
@Test
void testReadByteRangeNumbersNotSpecified() {
assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(BYTES_RANGE_MISSING_NUMBERS));
}
@Test
void testReadByteRangeNumberNotValid() {
assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(BYTES_RANGE_INVALID_NUMBER));
}
@Test
void testReadByteRangeLastPositionLessThanFirstPosition() {
final String rangeHeader = BYTES_RANGE_FORMAT.formatted(Long.MAX_VALUE, 0);
assertThrows(ByteRangeFormatException.class, () -> parser.readByteRange(rangeHeader));
}
@Test
void testReadByteRangeZeroToUnspecified() {
final long firstPosition = 0;
final String rangeHeader = BYTES_RANGE_LAST_POSITION_EMPTY.formatted(firstPosition);
final Optional<ByteRange> byteRangeFound = parser.readByteRange(rangeHeader);
assertTrue(byteRangeFound.isPresent());
final ByteRange byteRange = byteRangeFound.get();
final OptionalLong firstPositionFound = byteRange.getFirstPosition();
assertTrue(firstPositionFound.isPresent());
assertEquals(firstPosition, firstPositionFound.getAsLong());
final OptionalLong lastPositionFound = byteRange.getLastPosition();
assertTrue(lastPositionFound.isEmpty());
}
@Test
void testReadByteRangeSuffixRangeOne() {
final long lastPosition = 1;
final String rangeHeader = BYTES_RANGE_FIRST_POSITION_EMPTY.formatted(lastPosition);
final Optional<ByteRange> byteRangeFound = parser.readByteRange(rangeHeader);
assertTrue(byteRangeFound.isPresent());
final ByteRange byteRange = byteRangeFound.get();
final OptionalLong firstPositionFound = byteRange.getFirstPosition();
assertTrue(firstPositionFound.isEmpty());
final OptionalLong lastPositionFound = byteRange.getLastPosition();
assertTrue(lastPositionFound.isPresent());
assertEquals(lastPosition, lastPositionFound.getAsLong());
}
@Test
void testReadByteRangeZeroToMaximumLong() {
final long firstPosition = 0;
final long lastPosition = Long.MAX_VALUE;
final String rangeHeader = BYTES_RANGE_FORMAT.formatted(firstPosition, lastPosition);
final Optional<ByteRange> byteRangeFound = parser.readByteRange(rangeHeader);
assertTrue(byteRangeFound.isPresent());
final ByteRange byteRange = byteRangeFound.get();
final OptionalLong firstPositionFound = byteRange.getFirstPosition();
assertTrue(firstPositionFound.isPresent());
assertEquals(firstPosition, firstPositionFound.getAsLong());
final OptionalLong lastPositionFound = byteRange.getLastPosition();
assertTrue(lastPositionFound.isPresent());
assertEquals(lastPosition, lastPositionFound.getAsLong());
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;
import jakarta.ws.rs.core.Response;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
class StreamingOutputResponseBuilderTest {
private static final byte[] INPUT_BYTES = String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);;
private static final String RANGE = "bytes=0-%d".formatted(INPUT_BYTES.length);
private static final String ACCEPT_RANGES_BYTES = "bytes";
private static final String CONTENT_RANGE_EXPECTED = "bytes 0-%d/%d".formatted(INPUT_BYTES.length, INPUT_BYTES.length);
@Test
void testBuildInputStream() {
final InputStream inputStream = new ByteArrayInputStream(INPUT_BYTES);
final StreamingOutputResponseBuilder builder = new StreamingOutputResponseBuilder(inputStream);
final Response.ResponseBuilder responseBuilder = builder.build();
try (Response response = responseBuilder.build()) {
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
}
}
@Test
void testBuildRange() {
final InputStream inputStream = new ByteArrayInputStream(INPUT_BYTES);
final StreamingOutputResponseBuilder builder = new StreamingOutputResponseBuilder(inputStream);
builder.range(RANGE);
final Response.ResponseBuilder responseBuilder = builder.build();
try (Response response = responseBuilder.build()) {
assertEquals(Response.Status.PARTIAL_CONTENT.getStatusCode(), response.getStatus());
final String acceptRanges = response.getHeaderString(StreamingOutputResponseBuilder.ACCEPT_RANGES_HEADER);
assertEquals(ACCEPT_RANGES_BYTES, acceptRanges);
final String contentRange = response.getHeaderString(StreamingOutputResponseBuilder.CONTENT_RANGE_HEADER);
assertEquals(CONTENT_RANGE_EXPECTED, contentRange);
}
}
}