HDDS-1948. S3 MPU can't be created with octet-stream content-type (#1266)
This commit is contained in:
parent
abc8fde4ca
commit
edd708527d
|
@ -17,39 +17,60 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.s3;
|
package org.apache.hadoop.ozone.s3;
|
||||||
|
|
||||||
|
import javax.annotation.Priority;
|
||||||
import javax.ws.rs.container.ContainerRequestContext;
|
import javax.ws.rs.container.ContainerRequestContext;
|
||||||
import javax.ws.rs.container.ContainerRequestFilter;
|
import javax.ws.rs.container.ContainerRequestFilter;
|
||||||
import javax.ws.rs.container.PreMatching;
|
import javax.ws.rs.container.PreMatching;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
import javax.ws.rs.ext.Provider;
|
import javax.ws.rs.ext.Provider;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Filter to adjust request headers for compatible reasons.
|
* Filter to adjust request headers for compatible reasons.
|
||||||
|
*
|
||||||
|
* It should be executed AFTER signature check (VirtualHostStyleFilter) as the
|
||||||
|
* original Content-Type could be part of the base of the signature.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@Provider
|
@Provider
|
||||||
@PreMatching
|
@PreMatching
|
||||||
|
@Priority(VirtualHostStyleFilter.PRIORITY
|
||||||
|
+ S3GatewayHttpServer.FILTER_PRIORITY_DO_AFTER)
|
||||||
public class HeaderPreprocessor implements ContainerRequestFilter {
|
public class HeaderPreprocessor implements ContainerRequestFilter {
|
||||||
|
|
||||||
|
public static final String MULTIPART_UPLOAD_MARKER = "ozone/mpu";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void filter(ContainerRequestContext requestContext) throws
|
public void filter(ContainerRequestContext requestContext) throws
|
||||||
IOException {
|
IOException {
|
||||||
if (requestContext.getUriInfo().getQueryParameters()
|
MultivaluedMap<String, String> queryParameters =
|
||||||
.containsKey("delete")) {
|
requestContext.getUriInfo().getQueryParameters();
|
||||||
|
|
||||||
|
if (queryParameters.containsKey("delete")) {
|
||||||
//aws cli doesn't send proper Content-Type and by default POST requests
|
//aws cli doesn't send proper Content-Type and by default POST requests
|
||||||
//processed as form-url-encoded. Here we can fix this.
|
//processed as form-url-encoded. Here we can fix this.
|
||||||
requestContext.getHeaders()
|
requestContext.getHeaders()
|
||||||
.putSingle("Content-Type", MediaType.APPLICATION_XML);
|
.putSingle("Content-Type", MediaType.APPLICATION_XML);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requestContext.getUriInfo().getQueryParameters()
|
if (queryParameters.containsKey("uploadId")) {
|
||||||
.containsKey("uploadId")) {
|
|
||||||
//aws cli doesn't send proper Content-Type and by default POST requests
|
//aws cli doesn't send proper Content-Type and by default POST requests
|
||||||
//processed as form-url-encoded. Here we can fix this.
|
//processed as form-url-encoded. Here we can fix this.
|
||||||
requestContext.getHeaders()
|
requestContext.getHeaders()
|
||||||
.putSingle("Content-Type", MediaType.APPLICATION_XML);
|
.putSingle("Content-Type", MediaType.APPLICATION_XML);
|
||||||
}
|
} else if (queryParameters.containsKey("uploads")) {
|
||||||
|
// uploads defined but uploadId is not --> this is the creation of the
|
||||||
|
// multi-part-upload requests.
|
||||||
|
//
|
||||||
|
//In AWS SDK for go uses application/octet-stream which also
|
||||||
|
//should be fixed to route the request to the right jaxrs method.
|
||||||
|
//
|
||||||
|
//Should be empty instead of XML as the body is empty which can not be
|
||||||
|
//serialized as as CompleteMultipartUploadRequest
|
||||||
|
requestContext.getHeaders()
|
||||||
|
.putSingle("Content-Type", MULTIPART_UPLOAD_MARKER);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,11 @@ import org.apache.hadoop.hdds.server.BaseHttpServer;
|
||||||
*/
|
*/
|
||||||
public class S3GatewayHttpServer extends BaseHttpServer {
|
public class S3GatewayHttpServer extends BaseHttpServer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default offset between two filters.
|
||||||
|
*/
|
||||||
|
public static final int FILTER_PRIORITY_DO_AFTER = 50;
|
||||||
|
|
||||||
public S3GatewayHttpServer(Configuration conf,
|
public S3GatewayHttpServer(Configuration conf,
|
||||||
String name) throws IOException {
|
String name) throws IOException {
|
||||||
super(conf, name);
|
super(conf, name);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.s3;
|
package org.apache.hadoop.ozone.s3;
|
||||||
|
|
||||||
|
import javax.annotation.Priority;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import javax.ws.rs.container.ContainerRequestContext;
|
import javax.ws.rs.container.ContainerRequestContext;
|
||||||
import javax.ws.rs.container.ContainerRequestFilter;
|
import javax.ws.rs.container.ContainerRequestFilter;
|
||||||
|
@ -46,8 +47,11 @@ import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_DOMAIN_NA
|
||||||
|
|
||||||
@Provider
|
@Provider
|
||||||
@PreMatching
|
@PreMatching
|
||||||
|
@Priority(VirtualHostStyleFilter.PRIORITY)
|
||||||
public class VirtualHostStyleFilter implements ContainerRequestFilter {
|
public class VirtualHostStyleFilter implements ContainerRequestFilter {
|
||||||
|
|
||||||
|
public static final int PRIORITY = 100;
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
VirtualHostStyleFilter.class);
|
VirtualHostStyleFilter.class);
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.s3.endpoint;
|
package org.apache.hadoop.ozone.s3.endpoint;
|
||||||
|
|
||||||
|
import javax.ws.rs.Consumes;
|
||||||
import javax.ws.rs.DELETE;
|
import javax.ws.rs.DELETE;
|
||||||
import javax.ws.rs.DefaultValue;
|
import javax.ws.rs.DefaultValue;
|
||||||
import javax.ws.rs.GET;
|
import javax.ws.rs.GET;
|
||||||
|
@ -58,6 +59,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
|
||||||
|
import org.apache.hadoop.ozone.s3.HeaderPreprocessor;
|
||||||
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
|
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
|
||||||
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
||||||
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
||||||
|
@ -417,33 +419,19 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize MultiPartUpload request.
|
||||||
|
* <p>
|
||||||
|
* Note: the specific content type is set by the HeaderPreprocessor.
|
||||||
|
*/
|
||||||
@POST
|
@POST
|
||||||
@Produces(MediaType.APPLICATION_XML)
|
@Produces(MediaType.APPLICATION_XML)
|
||||||
public Response multipartUpload(
|
@Consumes(HeaderPreprocessor.MULTIPART_UPLOAD_MARKER)
|
||||||
|
public Response initializeMultipartUpload(
|
||||||
@PathParam("bucket") String bucket,
|
@PathParam("bucket") String bucket,
|
||||||
@PathParam("path") String key,
|
@PathParam("path") String key
|
||||||
@QueryParam("uploads") String uploads,
|
)
|
||||||
@QueryParam("uploadId") @DefaultValue("") String uploadID,
|
throws IOException, OS3Exception {
|
||||||
CompleteMultipartUploadRequest request) throws IOException, OS3Exception {
|
|
||||||
if (!uploadID.equals("")) {
|
|
||||||
//Complete Multipart upload request.
|
|
||||||
return completeMultipartUpload(bucket, key, uploadID, request);
|
|
||||||
} else {
|
|
||||||
// Initiate Multipart upload request.
|
|
||||||
return initiateMultipartUpload(bucket, key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initiate Multipart upload request.
|
|
||||||
* @param bucket
|
|
||||||
* @param key
|
|
||||||
* @return Response
|
|
||||||
* @throws IOException
|
|
||||||
* @throws OS3Exception
|
|
||||||
*/
|
|
||||||
private Response initiateMultipartUpload(String bucket, String key) throws
|
|
||||||
IOException, OS3Exception {
|
|
||||||
try {
|
try {
|
||||||
OzoneBucket ozoneBucket = getBucket(bucket);
|
OzoneBucket ozoneBucket = getBucket(bucket);
|
||||||
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
|
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
|
||||||
|
@ -473,7 +461,6 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
multipartUploadInitiateResponse.setKey(key);
|
multipartUploadInitiateResponse.setKey(key);
|
||||||
multipartUploadInitiateResponse.setUploadID(multipartInfo.getUploadID());
|
multipartUploadInitiateResponse.setUploadID(multipartInfo.getUploadID());
|
||||||
|
|
||||||
|
|
||||||
return Response.status(Status.OK).entity(
|
return Response.status(Status.OK).entity(
|
||||||
multipartUploadInitiateResponse).build();
|
multipartUploadInitiateResponse).build();
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
|
@ -484,18 +471,15 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Complete Multipart upload request.
|
* Complete a multipart upload.
|
||||||
* @param bucket
|
|
||||||
* @param key
|
|
||||||
* @param uploadID
|
|
||||||
* @param multipartUploadRequest
|
|
||||||
* @return Response
|
|
||||||
* @throws IOException
|
|
||||||
* @throws OS3Exception
|
|
||||||
*/
|
*/
|
||||||
private Response completeMultipartUpload(String bucket, String key, String
|
@POST
|
||||||
uploadID, CompleteMultipartUploadRequest multipartUploadRequest) throws
|
@Produces(MediaType.APPLICATION_XML)
|
||||||
IOException, OS3Exception {
|
public Response completeMultipartUpload(@PathParam("bucket") String bucket,
|
||||||
|
@PathParam("path") String key,
|
||||||
|
@QueryParam("uploadId") @DefaultValue("") String uploadID,
|
||||||
|
CompleteMultipartUploadRequest multipartUploadRequest)
|
||||||
|
throws IOException, OS3Exception {
|
||||||
OzoneBucket ozoneBucket = getBucket(bucket);
|
OzoneBucket ozoneBucket = getBucket(bucket);
|
||||||
Map<Integer, String> partsMap = new TreeMap<>();
|
Map<Integer, String> partsMap = new TreeMap<>();
|
||||||
List<CompleteMultipartUploadRequest.Part> partList =
|
List<CompleteMultipartUploadRequest.Part> partList =
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class TestAbortMultipartUpload {
|
||||||
rest.setHeaders(headers);
|
rest.setHeaders(headers);
|
||||||
rest.setClient(client);
|
rest.setClient(client);
|
||||||
|
|
||||||
Response response = rest.multipartUpload(bucket, key, "", "", null);
|
Response response = rest.initializeMultipartUpload(bucket, key);
|
||||||
|
|
||||||
assertEquals(response.getStatus(), 200);
|
assertEquals(response.getStatus(), 200);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class TestInitiateMultipartUpload {
|
||||||
rest.setHeaders(headers);
|
rest.setHeaders(headers);
|
||||||
rest.setClient(client);
|
rest.setClient(client);
|
||||||
|
|
||||||
Response response = rest.multipartUpload(bucket, key, "", "", null);
|
Response response = rest.initializeMultipartUpload(bucket, key);
|
||||||
|
|
||||||
assertEquals(response.getStatus(), 200);
|
assertEquals(response.getStatus(), 200);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
|
@ -69,7 +69,7 @@ public class TestInitiateMultipartUpload {
|
||||||
String uploadID = multipartUploadInitiateResponse.getUploadID();
|
String uploadID = multipartUploadInitiateResponse.getUploadID();
|
||||||
|
|
||||||
// Calling again should return different uploadID.
|
// Calling again should return different uploadID.
|
||||||
response = rest.multipartUpload(bucket, key, "", "", null);
|
response = rest.initializeMultipartUpload(bucket, key);
|
||||||
assertEquals(response.getStatus(), 200);
|
assertEquals(response.getStatus(), 200);
|
||||||
multipartUploadInitiateResponse =
|
multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class TestListParts {
|
||||||
REST.setHeaders(headers);
|
REST.setHeaders(headers);
|
||||||
REST.setClient(client);
|
REST.setClient(client);
|
||||||
|
|
||||||
Response response = REST.multipartUpload(BUCKET, KEY, "", "", null);
|
Response response = REST.initializeMultipartUpload(BUCKET, KEY);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class TestMultipartUploadComplete {
|
||||||
|
|
||||||
private String initiateMultipartUpload(String key) throws IOException,
|
private String initiateMultipartUpload(String key) throws IOException,
|
||||||
OS3Exception {
|
OS3Exception {
|
||||||
Response response = REST.multipartUpload(BUCKET, key, "", "", null);
|
Response response = REST.initializeMultipartUpload(BUCKET, key);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
||||||
|
@ -99,7 +99,7 @@ public class TestMultipartUploadComplete {
|
||||||
private void completeMultipartUpload(String key,
|
private void completeMultipartUpload(String key,
|
||||||
CompleteMultipartUploadRequest completeMultipartUploadRequest,
|
CompleteMultipartUploadRequest completeMultipartUploadRequest,
|
||||||
String uploadID) throws IOException, OS3Exception {
|
String uploadID) throws IOException, OS3Exception {
|
||||||
Response response = REST.multipartUpload(BUCKET, key, "", uploadID,
|
Response response = REST.completeMultipartUpload(BUCKET, key, uploadID,
|
||||||
completeMultipartUploadRequest);
|
completeMultipartUploadRequest);
|
||||||
|
|
||||||
assertEquals(response.getStatus(), 200);
|
assertEquals(response.getStatus(), 200);
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class TestPartUpload {
|
||||||
@Test
|
@Test
|
||||||
public void testPartUpload() throws Exception {
|
public void testPartUpload() throws Exception {
|
||||||
|
|
||||||
Response response = REST.multipartUpload(BUCKET, KEY, "", "", null);
|
Response response = REST.initializeMultipartUpload(BUCKET, KEY);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
||||||
|
@ -86,7 +86,7 @@ public class TestPartUpload {
|
||||||
@Test
|
@Test
|
||||||
public void testPartUploadWithOverride() throws Exception {
|
public void testPartUploadWithOverride() throws Exception {
|
||||||
|
|
||||||
Response response = REST.multipartUpload(BUCKET, KEY, "", "", null);
|
Response response = REST.initializeMultipartUpload(BUCKET, KEY);
|
||||||
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
|
||||||
(MultipartUploadInitiateResponse) response.getEntity();
|
(MultipartUploadInitiateResponse) response.getEntity();
|
||||||
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
assertNotNull(multipartUploadInitiateResponse.getUploadID());
|
||||||
|
|
Loading…
Reference in New Issue