mirror of https://github.com/apache/druid.git
Fix the offset setting in GoogleStorage#get (#10449)
* Fix the offset in get of GCP object * upgrade compute dependency * fix version * review comments * missed
This commit is contained in:
parent
d09fd8b035
commit
d057c5149f
|
@ -88,7 +88,7 @@
|
|||
<dependency>
|
||||
<groupId>com.google.apis</groupId>
|
||||
<artifactId>google-api-services-compute</artifactId>
|
||||
<version>v1-rev214-1.25.0</version>
|
||||
<version>${com.google.apis.compute.version}</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
|
@ -33,10 +33,6 @@
|
|||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
<com.google.apis.storage.version>v1-rev158-${com.google.apis.client.version}</com.google.apis.storage.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
|
|
|
@ -51,11 +51,9 @@ public class GoogleStorage
|
|||
public InputStream get(final String bucket, final String path, long start) throws IOException
|
||||
{
|
||||
final Get get = storage.objects().get(bucket, path);
|
||||
if (start > 0) {
|
||||
get.getMediaHttpDownloader().setBytesDownloaded(start);
|
||||
}
|
||||
get.getMediaHttpDownloader().setDirectDownloadEnabled(false);
|
||||
return get.executeMediaAsInputStream();
|
||||
InputStream inputStream = get.executeMediaAsInputStream();
|
||||
inputStream.skip(start);
|
||||
return inputStream;
|
||||
}
|
||||
|
||||
public void delete(final String bucket, final String path) throws IOException
|
||||
|
|
|
@ -142,10 +142,7 @@ public class GoogleTaskLogs implements TaskLogs
|
|||
start = 0;
|
||||
}
|
||||
|
||||
InputStream stream = new GoogleByteSource(storage, config.getBucket(), taskKey).openStream();
|
||||
stream.skip(start);
|
||||
|
||||
return stream;
|
||||
return new GoogleByteSource(storage, config.getBucket(), taskKey).openStream(start);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.google;
|
||||
|
||||
import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
|
||||
import com.google.api.client.http.ByteArrayContent;
|
||||
import com.google.api.client.http.HttpRequestInitializer;
|
||||
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||
import com.google.api.client.testing.http.MockHttpTransport;
|
||||
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
|
||||
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
|
||||
import com.google.api.services.storage.Storage;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class GoogleStorageTest
|
||||
{
|
||||
@Test
|
||||
public void testGet() throws IOException
|
||||
{
|
||||
String content = "abcdefghij";
|
||||
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
|
||||
response.setContent(content);
|
||||
GoogleStorage googleStorage = makeGoogleStorage(response);
|
||||
InputStream is = googleStorage.get("bucket", "path");
|
||||
String actual = GoogleTestUtils.readAsString(is);
|
||||
Assert.assertEquals(content, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWithOffset() throws IOException
|
||||
{
|
||||
String content = "abcdefghij";
|
||||
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
|
||||
response.setContent(content);
|
||||
GoogleStorage googleStorage = makeGoogleStorage(response);
|
||||
InputStream is = googleStorage.get("bucket", "path", 2);
|
||||
String actual = GoogleTestUtils.readAsString(is);
|
||||
Assert.assertEquals(content.substring(2), actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsert() throws IOException
|
||||
{
|
||||
String content = "abcdefghij";
|
||||
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
|
||||
response.addHeader("Location", "http://random-path");
|
||||
response.setContent("{}");
|
||||
MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build();
|
||||
GoogleStorage googleStorage = makeGoogleStorage(transport);
|
||||
googleStorage.insert("bucket", "path", new ByteArrayContent("text/html", StringUtils.toUtf8(content)));
|
||||
MockLowLevelHttpRequest request = transport.getLowLevelHttpRequest();
|
||||
String actual = request.getContentAsString();
|
||||
Assert.assertEquals(content, actual);
|
||||
}
|
||||
|
||||
private GoogleStorage makeGoogleStorage(MockLowLevelHttpResponse response)
|
||||
{
|
||||
MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build();
|
||||
return makeGoogleStorage(transport);
|
||||
}
|
||||
|
||||
private GoogleStorage makeGoogleStorage(MockHttpTransport transport)
|
||||
{
|
||||
HttpRequestInitializer initializer = new MockGoogleCredential.Builder().build();
|
||||
Storage storage = new Storage(transport, JacksonFactory.getDefaultInstance(), initializer);
|
||||
return new GoogleStorage(storage);
|
||||
}
|
||||
}
|
|
@ -117,7 +117,7 @@ public class GoogleTaskLogsTest extends EasyMockSupport
|
|||
final String logPath = PREFIX + "/" + TASKID;
|
||||
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
|
||||
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
|
||||
EasyMock.expect(storage.get(BUCKET, logPath)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
|
||||
EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
|
||||
|
||||
replayAll();
|
||||
|
||||
|
@ -134,19 +134,21 @@ public class GoogleTaskLogsTest extends EasyMockSupport
|
|||
public void testStreamTaskLogWithPositiveOffset() throws Exception
|
||||
{
|
||||
final String testLog = "hello this is a log";
|
||||
|
||||
final int offset = 5;
|
||||
final String expectedLog = testLog.substring(offset);
|
||||
final String logPath = PREFIX + "/" + TASKID;
|
||||
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
|
||||
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
|
||||
EasyMock.expect(storage.get(BUCKET, logPath)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
|
||||
EasyMock.expect(storage.get(BUCKET, logPath, offset))
|
||||
.andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog)));
|
||||
|
||||
replayAll();
|
||||
|
||||
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, 5);
|
||||
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, offset);
|
||||
|
||||
final StringWriter writer = new StringWriter();
|
||||
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
|
||||
Assert.assertEquals(writer.toString(), testLog.substring(5));
|
||||
Assert.assertEquals(writer.toString(), expectedLog);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
@ -155,19 +157,22 @@ public class GoogleTaskLogsTest extends EasyMockSupport
|
|||
public void testStreamTaskLogWithNegative() throws Exception
|
||||
{
|
||||
final String testLog = "hello this is a log";
|
||||
|
||||
final int offset = -3;
|
||||
final int internalOffset = testLog.length() + offset;
|
||||
final String expectedLog = testLog.substring(internalOffset);
|
||||
final String logPath = PREFIX + "/" + TASKID;
|
||||
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
|
||||
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
|
||||
EasyMock.expect(storage.get(BUCKET, logPath)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
|
||||
EasyMock.expect(storage.get(BUCKET, logPath, internalOffset))
|
||||
.andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog)));
|
||||
|
||||
replayAll();
|
||||
|
||||
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, -3);
|
||||
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, offset);
|
||||
|
||||
final StringWriter writer = new StringWriter();
|
||||
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
|
||||
Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() - 3));
|
||||
Assert.assertEquals(writer.toString(), expectedLog);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.api.client.util.DateTime;
|
|||
import com.google.api.services.storage.Storage;
|
||||
import com.google.api.services.storage.model.Objects;
|
||||
import com.google.api.services.storage.model.StorageObject;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.easymock.EasyMock;
|
||||
|
@ -30,6 +31,8 @@ import org.easymock.EasyMockSupport;
|
|||
import org.easymock.IExpectationSetters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.math.BigInteger;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
|
@ -119,4 +122,11 @@ public class GoogleTestUtils extends EasyMockSupport
|
|||
resultExpectationSetter.andVoid();
|
||||
}
|
||||
}
|
||||
|
||||
public static String readAsString(InputStream is) throws IOException
|
||||
{
|
||||
final StringWriter writer = new StringWriter();
|
||||
IOUtils.copy(is, writer, "UTF-8");
|
||||
return writer.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4100,7 +4100,7 @@ name: Google Cloud Storage JSON API
|
|||
license_category: binary
|
||||
module: extensions/druid-google-extensions
|
||||
license_name: Apache License version 2.0
|
||||
version: v1-rev158-1.25.0
|
||||
version: v1-rev20190523-1.26.0
|
||||
libraries:
|
||||
- com.google.apis: google-api-services-storage
|
||||
|
||||
|
@ -4110,7 +4110,7 @@ name: Google Compute Engine API
|
|||
license_category: binary
|
||||
module: extensions/gce-extensions
|
||||
license_name: Apache License version 2.0
|
||||
version: v1-rev214-1.25.0
|
||||
version: v1-rev20190607-1.26.0
|
||||
libraries:
|
||||
- com.google.apis: google-api-services-compute
|
||||
|
||||
|
@ -4130,7 +4130,7 @@ name: Google APIs Client Library For Java
|
|||
license_category: binary
|
||||
module: java-core
|
||||
license_name: Apache License version 2.0
|
||||
version: 1.25.0
|
||||
version: 1.26.0
|
||||
libraries:
|
||||
- com.google.api-client: google-api-client
|
||||
|
||||
|
@ -4140,7 +4140,7 @@ name: Google HTTP Client Library For Java
|
|||
license_category: binary
|
||||
module: java-core
|
||||
license_name: Apache License version 2.0
|
||||
version: 1.25.0
|
||||
version: 1.26.0
|
||||
libraries:
|
||||
- com.google.http-client: google-http-client
|
||||
- com.google.http-client: google-http-client-jackson2
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -118,8 +118,9 @@
|
|||
<!-- When upgrading ZK, edit docs and integration tests as well (integration-tests/docker-base/setup.sh) -->
|
||||
<zookeeper.version>3.4.14</zookeeper.version>
|
||||
<checkerframework.version>2.5.7</checkerframework.version>
|
||||
<com.google.apis.client.version>1.25.0</com.google.apis.client.version>
|
||||
<com.google.apis.compute.version>v1-rev214-1.25.0</com.google.apis.compute.version>
|
||||
<com.google.apis.client.version>1.26.0</com.google.apis.client.version>
|
||||
<com.google.apis.compute.version>v1-rev20190607-${com.google.apis.client.version}</com.google.apis.compute.version>
|
||||
<com.google.apis.storage.version>v1-rev20190523-${com.google.apis.client.version}</com.google.apis.storage.version>
|
||||
<repoOrgId>apache.snapshots</repoOrgId>
|
||||
<repoOrgName>Apache Snapshot Repository</repoOrgName>
|
||||
<repoOrgUrl>https://repository.apache.org/snapshots</repoOrgUrl>
|
||||
|
|
Loading…
Reference in New Issue