[repository-azure] plugin should use Azure Storage SDK v12 for Java (#1302)

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
This commit is contained in:
Andriy Redko 2021-10-18 19:48:32 -04:00 committed by GitHub
parent c7f5c90a5f
commit 9612fe80b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 1717 additions and 454 deletions

View File

@ -44,8 +44,30 @@ opensearchplugin {
}
dependencies {
api 'com.microsoft.azure:azure-storage:8.6.2'
api 'com.microsoft.azure:azure-keyvault-core:1.0.0'
api 'com.azure:azure-core:1.20.0'
api 'com.azure:azure-storage-common:12.12.0'
api 'com.azure:azure-core-http-netty:1.11.0'
api "io.netty:netty-codec-dns:${versions.netty}"
api "io.netty:netty-codec-socks:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-handler-proxy:${versions.netty}"
api "io.netty:netty-resolver-dns:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
implementation project(':modules:transport-netty4')
api 'com.azure:azure-storage-blob:12.13.0'
api 'org.reactivestreams:reactive-streams:1.0.3'
api 'io.projectreactor:reactor-core:3.4.10'
api 'io.projectreactor.netty:reactor-netty:1.0.11'
api 'io.projectreactor.netty:reactor-netty-core:1.0.11'
api 'io.projectreactor.netty:reactor-netty-http:1.0.11'
api "org.slf4j:slf4j-api:${versions.slf4j}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
api "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
api "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:${versions.jackson}"
api "com.fasterxml.jackson.module:jackson-module-jaxb-annotations:${versions.jackson}"
api 'org.codehaus.woodstox:stax2-api:4.2.1'
implementation 'com.fasterxml.woodstox:woodstox-core:6.1.1'
runtimeOnly 'com.google.guava:guava:30.1.1-jre'
api 'org.apache.commons:commons-lang3:3.4'
testImplementation project(':test:fixtures:azure-fixture')
@ -63,15 +85,108 @@ tasks.named("dependencyLicenses").configure {
mapping from: /jersey-.*/, to: 'jersey'
mapping from: /jaxb-.*/, to: 'jaxb'
mapping from: /stax-.*/, to: 'stax'
mapping from: /netty-.*/, to: 'netty'
mapping from: /reactor-.*/, to: 'reactor'
}
thirdPartyAudit {
ignoreMissingClasses(
// Optional and not enabled by Elasticsearch
'org.slf4j.Logger',
'org.slf4j.LoggerFactory',
'com.google.common.util.concurrent.internal.InternalFutureFailureAccess',
'com.google.common.util.concurrent.internal.InternalFutures'
'com.google.common.util.concurrent.internal.InternalFutures',
'com.azure.storage.internal.avro.implementation.AvroObject',
'com.azure.storage.internal.avro.implementation.AvroReader',
'com.azure.storage.internal.avro.implementation.AvroReaderFactory',
'com.azure.storage.internal.avro.implementation.schema.AvroSchema',
'com.ctc.wstx.shaded.msv_core.driver.textui.Driver',
'io.micrometer.core.instrument.Clock',
'io.micrometer.core.instrument.Counter',
'io.micrometer.core.instrument.Counter$Builder',
'io.micrometer.core.instrument.DistributionSummary',
'io.micrometer.core.instrument.DistributionSummary$Builder',
'io.micrometer.core.instrument.Gauge',
'io.micrometer.core.instrument.Gauge$Builder',
'io.micrometer.core.instrument.Meter',
'io.micrometer.core.instrument.MeterRegistry',
'io.micrometer.core.instrument.Metrics',
'io.micrometer.core.instrument.Tag',
'io.micrometer.core.instrument.Tags',
'io.micrometer.core.instrument.Timer',
'io.micrometer.core.instrument.Timer$Builder',
'io.micrometer.core.instrument.Timer$Sample',
'io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics',
'io.micrometer.core.instrument.composite.CompositeMeterRegistry',
'io.micrometer.core.instrument.search.Search',
'io.netty.channel.epoll.Epoll',
'io.netty.channel.epoll.EpollDatagramChannel',
'io.netty.channel.epoll.EpollDomainDatagramChannel',
'io.netty.channel.epoll.EpollDomainSocketChannel',
'io.netty.channel.epoll.EpollEventLoopGroup',
'io.netty.channel.epoll.EpollServerDomainSocketChannel',
'io.netty.channel.epoll.EpollServerSocketChannel',
'io.netty.channel.epoll.EpollSocketChannel',
'io.netty.channel.kqueue.KQueue',
'io.netty.channel.kqueue.KQueueDatagramChannel',
'io.netty.channel.kqueue.KQueueDomainDatagramChannel',
'io.netty.channel.kqueue.KQueueDomainSocketChannel',
'io.netty.channel.kqueue.KQueueEventLoopGroup',
'io.netty.channel.kqueue.KQueueServerDomainSocketChannel',
'io.netty.channel.kqueue.KQueueServerSocketChannel',
'io.netty.channel.kqueue.KQueueSocketChannel',
'io.netty.channel.unix.DomainDatagramChannel',
'io.netty.handler.codec.haproxy.HAProxyMessage',
'io.netty.handler.codec.haproxy.HAProxyMessageDecoder',
'io.netty.incubator.channel.uring.IOUring',
'io.netty.incubator.channel.uring.IOUringDatagramChannel',
'io.netty.incubator.channel.uring.IOUringEventLoopGroup',
'io.netty.incubator.channel.uring.IOUringServerSocketChannel',
'io.netty.incubator.channel.uring.IOUringSocketChannel',
'javax.activation.DataHandler',
'javax.activation.DataSource',
'javax.xml.bind.JAXBElement',
'javax.xml.bind.annotation.XmlAccessOrder',
'javax.xml.bind.annotation.XmlAccessType',
'javax.xml.bind.annotation.XmlAccessorOrder',
'javax.xml.bind.annotation.XmlAccessorType',
'javax.xml.bind.annotation.XmlAttribute',
'javax.xml.bind.annotation.XmlElement',
'javax.xml.bind.annotation.XmlElement$DEFAULT',
'javax.xml.bind.annotation.XmlElementRef',
'javax.xml.bind.annotation.XmlElementRefs',
'javax.xml.bind.annotation.XmlElementWrapper',
'javax.xml.bind.annotation.XmlElements',
'javax.xml.bind.annotation.XmlEnum',
'javax.xml.bind.annotation.XmlEnumValue',
'javax.xml.bind.annotation.XmlID',
'javax.xml.bind.annotation.XmlIDREF',
'javax.xml.bind.annotation.XmlRootElement',
'javax.xml.bind.annotation.XmlSeeAlso',
'javax.xml.bind.annotation.XmlTransient',
'javax.xml.bind.annotation.XmlType',
'javax.xml.bind.annotation.XmlValue',
'javax.xml.bind.annotation.adapters.XmlAdapter',
'javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter',
'javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter$DEFAULT',
'javax.xml.bind.annotation.adapters.XmlJavaTypeAdapters',
'kotlin.TypeCastException',
'kotlin.collections.ArraysKt',
'kotlin.jvm.JvmClassMappingKt',
'kotlin.jvm.functions.Function0',
'kotlin.jvm.functions.Function1',
'kotlin.jvm.internal.FunctionReference',
'kotlin.jvm.internal.Intrinsics',
'kotlin.jvm.internal.Reflection',
'kotlin.jvm.internal.markers.KMappedMarker',
'kotlin.reflect.KClass',
'kotlin.reflect.KDeclarationContainer',
'kotlin.sequences.Sequence',
'org.osgi.framework.BundleActivator',
'org.osgi.framework.BundleContext',
'org.slf4j.impl.StaticLoggerBinder',
'org.slf4j.impl.StaticMDCBinder',
'org.slf4j.impl.StaticMarkerBinder',
'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)
ignoreViolations(
@ -89,7 +204,8 @@ thirdPartyAudit {
'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper$1',
'com.google.common.hash.LittleEndianByteArray$UnsafeByteArray',
'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator',
'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1'
'com.google.common.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1',
'reactor.core.publisher.Traces$SharedSecretsCallSiteSupplierFactory$TracingException'
)
}

View File

@ -0,0 +1 @@
a98c6bd18aa2066ecd8b39bf7ac51bd8e7307851

View File

@ -0,0 +1 @@
2177ae2dc9cff849e49f9180bf6b95b8e2c78e1b

View File

@ -1 +0,0 @@
e89dd5e621e21b753096ec6a03f203c01482c612

View File

@ -1 +0,0 @@
d1b6de7264205e2441c667dfee5b002bbac61644

View File

@ -0,0 +1 @@
2743c7489761b18da1f578d934bb0b0363c2b2db

View File

@ -0,0 +1 @@
3657650e3e17ce1f82b3a6fc5b68ba1005b869f9

View File

@ -0,0 +1,8 @@
This copy of Jackson JSON processor streaming parser/generator is licensed under the
Apache (Software) License, version 2.0 ("the License").
See the License for details about distribution rights, and the
specific rights regarding derivate works.
You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0

View File

@ -0,0 +1,20 @@
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.

View File

@ -0,0 +1 @@
52d929d5bb21d0186fe24c09624cc3ee4bafc3b3

View File

@ -0,0 +1 @@
b064cf057f23d3d35390328c5030847efeffedde

View File

@ -0,0 +1 @@
4b872e5a9f7e6644c2dd8d7358ed9fad714d7c90

View File

@ -0,0 +1 @@
a0a9870b681a72789c5c6bdc380e45ab719c6aa3

View File

@ -0,0 +1 @@
02b389d7206327e54ae31f709ab75a4a3f33e148

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,116 @@
The Netty Project
=================
Please visit the Netty web site for more information:
* http://netty.io/
Copyright 2011 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified version of 'JZlib', a re-implementation of
zlib in pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD Style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product contains a modified version of 'Webbit', a Java event based
WebSocket and HTTP server:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* http://code.google.com/p/protobuf/
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/
This product optionally depends on 'SLF4J', a simple logging facade for Java,
which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* http://www.slf4j.org/
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework,
which can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* http://logging.apache.org/log4j/
This product optionally depends on 'JBoss Logging', a logging framework,
which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-logging.txt (GNU LGPL 2.1)
* HOMEPAGE:
* http://anonsvn.jboss.org/repos/common/common-logging-spi/
This product optionally depends on 'Apache Felix', an open source OSGi
framework implementation, which can be obtained at:
* LICENSE:
* license/LICENSE.felix.txt (Apache License 2.0)
* HOMEPAGE:
* http://felix.apache.org/

View File

@ -0,0 +1 @@
82d019ef4b32b5bc66a73895f9d557d15d910931

View File

@ -0,0 +1 @@
b23b7fbffa4de30f336c2cc8bd1951403d1bebaa

View File

@ -0,0 +1 @@
37401ea0d02a86ccd529dc9bb70241bda18e42ff

View File

@ -0,0 +1 @@
46050c8dafd8d97ea6d04c861b75db20fe4ac39f

View File

@ -0,0 +1 @@
23fad2710b2a5e7f5c69ecf2893f784f9f7f365b

View File

@ -0,0 +1 @@
f5d4ad1f995ec40c20220ad9ec75b14172ac9320

View File

@ -0,0 +1 @@
d9fb7a7926ffa635b3dcaa5049fb2bfa25b3e7d0

View File

@ -0,0 +1,21 @@
MIT No Attribution
Copyright 2014 Reactive Streams
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
https://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1 @@
4fac3e29002904d750987aeb6249ff1cd47c5fd5

View File

@ -0,0 +1 @@
3af49856e8ad9fe47a741afa7f56601e61723e1d

View File

@ -0,0 +1 @@
cfb3a45b62945a46a45a50050ec3b7ae8f5272f6

View File

@ -0,0 +1 @@
70dbc1f1c47e50f910c192928025621ed3d280d7

View File

@ -0,0 +1 @@
8619e95939167fb37245b5670135e4feb0ec7d50

View File

@ -0,0 +1,21 @@
Copyright (c) 2004-2014 QOS.ch
All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1 @@
a3f7325c52240418c2ba257b103c3c550e140c83

View File

@ -0,0 +1,23 @@
Copyright (c) 2008 FasterXML LLC info@fasterxml.com
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1 @@
989bb31963ed1758b95c7c4381a91592a9a8df61

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -31,15 +31,17 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.azure.AzureHttpHandler;
import reactor.core.scheduler.Schedulers;
import org.junit.AfterClass;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.MockSecureSettings;
@ -59,6 +61,10 @@ import java.util.regex.Pattern;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends OpenSearchMockAPIBasedRepositoryIntegTestCase {
@AfterClass
public static void shutdownSchedulers() {
Schedulers.shutdownNow();
}
@Override
protected String repositoryType() {
@ -96,7 +102,7 @@ public class AzureBlobStoreRepositoryTests extends OpenSearchMockAPIBasedReposit
secureSettings.setString(AzureStorageSettings.ACCOUNT_SETTING.getConcreteSettingForNamespace("test").getKey(), "account");
secureSettings.setString(AzureStorageSettings.KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), key);
final String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + httpServerUrl();
final String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + httpServerUrl() + "/";
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(AzureStorageSettings.ENDPOINT_SUFFIX_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint)
@ -118,15 +124,14 @@ public class AzureBlobStoreRepositoryTests extends OpenSearchMockAPIBasedReposit
AzureStorageService createAzureStoreService(final Settings settings) {
return new AzureStorageService(settings) {
@Override
RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries());
RequestRetryOptions createRetryPolicy(final AzureStorageSettings azureStorageSettings, String secondaryHost) {
return new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, azureStorageSettings.getMaxRetries(),
1, 100L, 500L, secondaryHost);
}
@Override
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
BlobRequestOptions options = new BlobRequestOptions();
options.setSingleBlobPutThresholdInBytes(Math.toIntExact(ByteSizeUnit.MB.toBytes(1)));
return options;
ParallelTransferOptions getBlobRequestOptionsForWriteBlob() {
return new ParallelTransferOptions().setMaxSingleUploadSizeLong(ByteSizeUnit.MB.toBytes(1));
}
};
}
@ -165,11 +170,8 @@ public class AzureBlobStoreRepositoryTests extends OpenSearchMockAPIBasedReposit
@Override
protected String requestUniqueId(final HttpExchange exchange) {
final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
return exchange.getRequestMethod()
+ " " + requestId
+ (range != null ? " " + range : "");
final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID);
return exchange.getRequestMethod() + " " + requestId;
}
}
@ -180,6 +182,7 @@ public class AzureBlobStoreRepositoryTests extends OpenSearchMockAPIBasedReposit
private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler {
private static final Pattern listPattern = Pattern.compile("GET /[a-zA-Z0-9]+\\??.+");
private static final Pattern getPattern = Pattern.compile("GET /[^?/]+/[^?/]+\\??.*");
private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
super(delegate);
@ -187,7 +190,7 @@ public class AzureBlobStoreRepositoryTests extends OpenSearchMockAPIBasedReposit
@Override
protected void maybeTrack(String request, Headers headers) {
if (Regex.simpleMatch("GET /*/*", request)) {
if (getPattern.matcher(request).matches()) {
trackRequest("GetBlob");
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
trackRequest("GetBlobProperties");

View File

@ -32,10 +32,14 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import reactor.core.scheduler.Schedulers;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobStorageException;
import org.junit.AfterClass;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
@ -57,6 +61,10 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
@AfterClass
public static void shutdownSchedulers() {
Schedulers.shutdownNow();
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
@ -117,15 +125,15 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
repository.threadPool().generic().execute(ActionRunnable.wrap(future, l -> {
final AzureBlobStore blobStore = (AzureBlobStore) repository.blobStore();
final String account = "default";
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = blobStore.getService().client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(blobStore.toString());
final Tuple<BlobServiceClient, Supplier<Context>> client = blobStore.getService().client(account);
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(blobStore.toString());
try {
SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, client.v2().get()));
SocketAccess.doPrivilegedException(() -> blobContainer.existsWithResponse(null, client.v2().get()));
future.onFailure(new RuntimeException(
"The SAS token used in this test allowed for checking container existence. This test only supports tokens " +
"that grant only the documented permission requirements for the Azure repository plugin."));
} catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_FORBIDDEN) {
} catch (BlobStorageException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_FORBIDDEN) {
future.onResponse(null);
} else {
future.onFailure(e);

View File

@ -32,9 +32,10 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.common.implementation.Constants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
@ -49,6 +50,7 @@ import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.threadpool.ThreadPool;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
@ -59,6 +61,10 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
public class AzureBlobContainer extends AbstractBlobContainer {
/**
* The default minimum read size, in bytes, for a {@link BlobInputStream} or {@link FileInputStream}.
*/
public static final int DEFAULT_MINIMUM_READ_SIZE_IN_BYTES = 4 * Constants.MB;
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
private final AzureBlobStore blobStore;
@ -77,7 +83,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
logger.trace("blobExists({})", blobName);
try {
return blobStore.blobExists(buildKey(blobName));
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | BlobStorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage());
}
return false;
@ -96,8 +102,8 @@ public class AzureBlobContainer extends AbstractBlobContainer {
}
try {
return blobStore.getInputStream(buildKey(blobName), position, length);
} catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
} catch (BlobStorageException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new NoSuchFileException(e.getMessage());
}
throw new IOException(e);
@ -118,7 +124,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override
public long readBlobPreferredLength() {
return Constants.DEFAULT_MINIMUM_READ_SIZE_IN_BYTES;
return DEFAULT_MINIMUM_READ_SIZE_IN_BYTES;
}
@Override
@ -126,7 +132,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
try {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} catch (URISyntaxException|StorageException e) {
} catch (URISyntaxException | BlobStorageException e) {
throw new IOException("Can not write blob " + blobName, e);
}
}
@ -140,7 +146,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
public DeleteResult delete() throws IOException {
try {
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | BlobStorageException e) {
throw new IOException(e);
}
}
@ -161,8 +167,8 @@ public class AzureBlobContainer extends AbstractBlobContainer {
logger.trace("deleteBlob({})", blobName);
try {
blobStore.deleteBlob(buildKey(blobName));
} catch (StorageException e) {
if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
} catch (BlobStorageException e) {
if (e.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw new IOException(e);
}
} catch (URISyntaxException e) {
@ -184,7 +190,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
try {
return blobStore.listBlobsByPrefix(keyPath, prefix);
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | BlobStorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", prefix, blobStore, e.getMessage());
throw new IOException(e);
}
@ -201,7 +207,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
final BlobPath path = path();
try {
return blobStore.children(path);
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | BlobStorageException e) {
throw new IOException("Failed to list children in path [" + path.buildAsString() + "].", e);
}
}

View File

@ -32,24 +32,28 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RequestCompletedEvent;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageEvent;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.common.implementation.Constants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.RepositoryMetadata;
@ -62,29 +66,26 @@ import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.repositories.azure.AzureRepository.Repository;
import org.opensearch.threadpool.ThreadPool;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -103,10 +104,7 @@ public class AzureBlobStore implements BlobStore {
private final LocationMode locationMode;
private final Stats stats = new Stats();
private final Consumer<HttpURLConnection> getMetricsCollector;
private final Consumer<HttpURLConnection> listMetricsCollector;
private final Consumer<HttpURLConnection> uploadMetricsCollector;
private final BiConsumer<HttpRequest, HttpResponse> metricsCollector;
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, ThreadPool threadPool) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
@ -118,22 +116,27 @@ public class AzureBlobStore implements BlobStore {
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
final Map<String, AzureStorageSettings> newSettings = AzureStorageSettings.overrideLocationMode(prevSettings, this.locationMode);
this.service.refreshAndClearCache(newSettings);
this.getMetricsCollector = (httpURLConnection) -> {
if (httpURLConnection.getRequestMethod().equals("HEAD")) {
stats.headOperations.incrementAndGet();
this.metricsCollector = (request, response) -> {
if (response.getStatusCode() >= 300) {
return;
}
assert httpURLConnection.getRequestMethod().equals("GET");
stats.getOperations.incrementAndGet();
};
this.listMetricsCollector = (httpURLConnection) -> {
assert httpURLConnection.getRequestMethod().equals("GET");
final HttpMethod method = request.getHttpMethod();
if (method.equals(HttpMethod.HEAD)) {
stats.headOperations.incrementAndGet();
return;
} else if (method.equals(HttpMethod.GET)) {
final String query = request.getUrl().getQuery();
final String queryParams = (query == null) ? "" : query;
if (queryParams.contains("comp=list")) {
stats.listOperations.incrementAndGet();
};
this.uploadMetricsCollector = (httpURLConnection -> {
assert httpURLConnection.getRequestMethod().equals("PUT");
String queryParams = httpURLConnection.getURL().getQuery() == null ? "" : httpURLConnection.getURL().getQuery();
} else {
stats.getOperations.incrementAndGet();
}
} else if (method.equals(HttpMethod.PUT)) {
final String query = request.getUrl().getQuery();
final String queryParams = (query == null) ? "" : query;
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
@ -147,7 +150,8 @@ public class AzureBlobStore implements BlobStore {
// it's possible that the URI parameters contain additional parameters unrelated to the upload type.
stats.putOperations.incrementAndGet();
}
});
}
};
}
@Override
@ -173,58 +177,63 @@ public class AzureBlobStore implements BlobStore {
@Override
public void close() {
service.close();
}
public boolean blobExists(String blob) throws URISyntaxException, StorageException {
public boolean blobExists(String blob) throws URISyntaxException, BlobStorageException {
// Container name must be lower case.
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
return SocketAccess.doPrivilegedException(() -> {
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
return azureBlob.exists(null, null, context);
final BlobClient azureBlob = blobContainer.getBlobClient(blob);
final Response<Boolean> response = azureBlob.existsWithResponse(timeout(), client.v2().get());
return response.getValue();
});
}
public void deleteBlob(String blob) throws URISyntaxException, StorageException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
public void deleteBlob(String blob) throws URISyntaxException, BlobStorageException {
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
// Container name must be lower case.
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
SocketAccess.doPrivilegedVoidException(() -> {
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
final BlobClient azureBlob = blobContainer.getBlobClient(blob);
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] deleted status [{}].", container,
blob, response.getStatusCode()));
});
}
public DeleteResult deleteBlobDirectory(String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
throws URISyntaxException, BlobStorageException, IOException {
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
final AtomicLong blobsDeleted = new AtomicLong();
final AtomicLong bytesDeleted = new AtomicLong();
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path);
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true,
EnumSet.noneOf(BlobListingDetails.class), null, context)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) {
// Skipping prefixes as those are not deletable and should not be there
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len;
if (blobItem instanceof CloudBlob) {
len = ((CloudBlob) blobItem).getProperties().getLength();
} else {
len = -1L;
}
deleteBlob(blobPath);
final long len = blobItem.getProperties().getContentLength();
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.",
container, blobItem.getName()));
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] deleted status [{}].", container,
blobItem.getName(), response.getStatusCode()));
blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
@ -257,79 +266,77 @@ public class AzureBlobStore implements BlobStore {
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}
public InputStream getInputStream(String blob, long position, @Nullable Long length) throws URISyntaxException, StorageException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector);
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
public InputStream getInputStream(String blob, long position, @Nullable Long length) throws URISyntaxException, BlobStorageException {
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
final BlobClient azureBlob = blobContainer.getBlobClient(blob);
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
final long limit;
return SocketAccess.doPrivilegedException(() -> {
if (length == null) {
// Loading the blob attributes so we can get its length
SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadAttributes(null, null, context));
limit = blockBlobReference.getProperties().getLength() - position;
return azureBlob.openInputStream(new BlobRange(position), null);
} else {
return azureBlob.openInputStream(new BlobRange(position, length), null);
}
else {
limit = length;
}
final BlobInputStream blobInputStream = new BlobInputStream(limit, blockBlobReference, position, context);
if (length != null) {
// pre-filling the buffer in case of ranged reads so this method throws a 404 storage exception right away in case the blob
// does not exist
blobInputStream.fill();
}
return blobInputStream;
});
}
public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix)
throws URISyntaxException, StorageException {
throws URISyntaxException, BlobStorageException {
final Map<String, BlobMetadata> blobsBuilder = new HashMap<String, BlobMetadata>();
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
// NOTE: this should be here: if (prefix == null) prefix = "";
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
final Map<String, BlobMetadata> blobsBuilder = new HashMap<String, BlobMetadata>();
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
.setDetails(new BlobListDetails().setRetrieveMetadata(true))
.setPrefix(keyPath + (prefix == null ? "" : prefix));
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
enumBlobListingDetails, null, context)) {
final URI uri = blobItem.getUri();
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
if (blobItem instanceof CloudBlob) {
final BlobProperties properties = ((CloudBlob) blobItem).getProperties();
final String name = blobPath.substring(keyPath.length());
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getLength()));
for (final BlobItem blobItem: blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
// Skipping over the prefixes, only look for the blobs
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
continue;
}
final String name = getBlobName(blobItem.getName(), container, keyPath);
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
final BlobItemProperties properties = blobItem.getProperties();
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
}
});
return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
}
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, BlobStorageException {
final Set<String> blobsBuilder = new HashSet<String>();
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
final String keyPath = path.buildAsString();
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
.setDetails(new BlobListDetails().setRetrieveMetadata(true))
.setPrefix(keyPath);
SocketAccess.doPrivilegedVoidException(() -> {
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, context)) {
if (blobItem instanceof CloudBlobDirectory) {
final URI uri = blobItem.getUri();
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
for (final BlobItem blobItem: blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
// Skipping over the blobs, only look for prefixes
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String uriPath = uri.getPath();
blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1));
}
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
blobsBuilder.add(name);
}
};
});
return Collections.unmodifiableMap(blobsBuilder.stream().collect(
@ -337,44 +344,53 @@ public class AzureBlobStore implements BlobStore {
}
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, IOException {
throws URISyntaxException, BlobStorageException, IOException {
assert inputStream.markSupported()
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext operationContext = hookMetricCollector(client().v2().get(), uploadMetricsCollector);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
final BlobClient blob = blobContainer.getBlobClient(blobName);
try {
final AccessCondition accessCondition =
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), operationContext));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
if (failIfAlreadyExists) {
blobRequestConditions.setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD);
}
SocketAccess.doPrivilegedVoidException(() -> {
final Response<?> response = blob.uploadWithResponse(
new BlobParallelUploadOptions(inputStream, blobSize)
.setRequestConditions(blobRequestConditions)
.setParallelTransferOptions(service.getBlobRequestOptionsForWriteBlob()),
timeout(), client.v2().get());
logger.trace(() -> new ParameterizedMessage("upload({}, stream, {}) - status [{}]",
blobName, blobSize, response.getStatusCode()));
});
} catch (final BlobStorageException se) {
if (failIfAlreadyExists && se.getStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
BlobErrorCode.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
}
throw se;
} catch (final RuntimeException ex) {
// Since most of the logic is happening inside the reactive pipeline, the checked exceptions
// are swallowed and wrapped into runtime one (see please Exceptions.ReactiveException).
if (ex.getCause() != null) {
Throwables.rethrow(ex.getCause());
} else {
throw ex;
}
}
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
}
private Tuple<CloudBlobClient, Supplier<OperationContext>> client() {
return service.client(clientName);
private Tuple<BlobServiceClient, Supplier<Context>> client() {
return service.client(clientName, metricsCollector);
}
private OperationContext hookMetricCollector(OperationContext context, Consumer<HttpURLConnection> metricCollector) {
context.getRequestCompletedEventHandler().addListener(new StorageEvent<RequestCompletedEvent>() {
@Override
public void eventOccurred(RequestCompletedEvent eventArg) {
int statusCode = eventArg.getRequestResult().getStatusCode();
if (statusCode < 300) {
metricCollector.accept((HttpURLConnection) eventArg.getConnectionObject());
}
}
});
return context;
private Duration timeout() {
return service.getBlobRequestTimeout(clientName);
}
@Override
@ -382,6 +398,27 @@ public class AzureBlobStore implements BlobStore {
return stats.toMap();
}
/**
* Extracts the name of the blob from path or prefixed blob name
* @param pathOrName prefixed blob name or blob path
* @param container container
* @param keyPath key path
* @return blob name
*/
private String getBlobName(final String pathOrName, final String container, final String keyPath) {
String name = pathOrName;
if (name.matches("." + container + ".")) {
name = name.substring(1 + container.length() + 1);
}
if (name.startsWith(keyPath)) {
name = name.substring(keyPath.length());
}
return name;
}
private static class Stats {
private final AtomicLong getOperations = new AtomicLong();
@ -405,97 +442,4 @@ public class AzureBlobStore implements BlobStore {
"PutBlockList", putBlockListOperations.get());
}
}
/**
* Building our own input stream instead of using the SDK's {@link com.microsoft.azure.storage.blob.BlobInputStream}
* because that stream is highly inefficient in both memory and CPU use.
*/
private static class BlobInputStream extends InputStream {
/**
* Maximum number of bytes to fetch per read request and thus to buffer on heap at a time.
* Set to 4M because that's what {@link com.microsoft.azure.storage.blob.BlobInputStream} uses.
*/
private static final int MAX_READ_CHUNK_SIZE = ByteSizeUnit.MB.toIntBytes(4);
/**
* Using a {@link ByteArrayOutputStream} as a buffer instead of a byte array since the byte array APIs on the SDK are less
* efficient.
*/
private final ByteArrayOutputStream buffer;
private final long limit;
private final CloudBlockBlob blockBlobReference;
private final long start;
private final OperationContext context;
// current read position on the byte array backing #buffer
private int pos;
// current position up to which the contents of the blob where buffered
private long offset;
BlobInputStream(long limit, CloudBlockBlob blockBlobReference, long start, OperationContext context) {
this.limit = limit;
this.blockBlobReference = blockBlobReference;
this.start = start;
this.context = context;
buffer = new ByteArrayOutputStream(Math.min(MAX_READ_CHUNK_SIZE, Math.toIntExact(Math.min(limit, Integer.MAX_VALUE)))) {
@Override
public byte[] toByteArray() {
return buf;
}
};
pos = 0;
offset = 0;
}
@Override
public int read() throws IOException {
try {
fill();
} catch (StorageException | URISyntaxException ex) {
throw new IOException(ex);
}
if (pos == buffer.size()) {
return -1;
}
return buffer.toByteArray()[pos++];
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
fill();
} catch (StorageException | URISyntaxException ex) {
throw new IOException(ex);
}
final int buffered = buffer.size();
int remaining = buffered - pos;
if (len > 0 && remaining == 0) {
return -1;
}
final int toRead = Math.min(remaining, len);
System.arraycopy(buffer.toByteArray(), pos, b, off, toRead);
pos += toRead;
return toRead;
}
void fill() throws StorageException, URISyntaxException {
if (pos == buffer.size()) {
final long toFill = Math.min(limit - this.offset, MAX_READ_CHUNK_SIZE);
if (toFill <= 0L) {
return;
}
buffer.reset();
SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadRange(
start + this.offset, toFill, buffer, null, null, context));
this.offset += buffer.size();
pos = 0;
}
}
}
}

View File

@ -32,7 +32,6 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
@ -142,6 +141,7 @@ public class AzureRepository extends MeteredBlobStoreRepository {
logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
blobStore, chunkSize, isCompress(), basePath));
return blobStore;
}

View File

@ -32,14 +32,31 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import reactor.core.publisher.Mono;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpPipelinePosition;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.ProxyOptions.Type;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.implementation.connectionstring.StorageConnectionString;
import com.azure.storage.common.implementation.connectionstring.StorageEndpoint;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
@ -47,25 +64,42 @@ import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import java.net.URI;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.InvalidKeyException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
public class AzureStorageService {
public class AzureStorageService implements AutoCloseable {
private final ClientLogger logger = new ClientLogger(AzureStorageService.class);
/**
* Maximum blob's block size size
*/
public static final ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
/**
* Maximum allowed blob size in Azure blob store.
* Maximum allowed blob's block size in Azure blob store.
*/
public static final ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(Constants.MAX_BLOB_SIZE, ByteSizeUnit.BYTES);
public static final ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES_LONG,
ByteSizeUnit.BYTES);
// 'package' for testing
volatile Map<String, AzureStorageSettings> storageSettings = emptyMap();
private final Map<AzureStorageSettings, ClientState> clients = new ConcurrentHashMap<>();
public AzureStorageService(Settings settings) {
// eagerly load client settings so that secure settings are read
@ -74,55 +108,139 @@ public class AzureStorageService {
}
/**
* Creates a {@code CloudBlobClient} on each invocation using the current client
* settings. CloudBlobClient is not thread safe and the settings can change,
* therefore the instance is not cache-able and should only be reused inside a
* thread for logically coupled ops. The {@code OperationContext} is used to
* specify the proxy, but a new context is *required* for each call.
* Obtains a {@code BlobServiceClient} on each invocation using the current client
* settings. BlobServiceClient is thread safe and and could be cached but the settings
* can change, therefore the instance might be recreated from scratch.
*
* @param clientName client name
* @return the {@code BlobServiceClient} instance and context
*/
public Tuple<CloudBlobClient, Supplier<OperationContext>> client(String clientName) {
public Tuple<BlobServiceClient, Supplier<Context>> client(String clientName) {
return client(clientName, (request, response) -> {});
}
/**
* Obtains a {@code BlobServiceClient} on each invocation using the current client
* settings. BlobServiceClient is thread safe and and could be cached but the settings
* can change, therefore the instance might be recreated from scratch.
* @param clientName client name
* @param statsCollector statistics collector
* @return the {@code BlobServiceClient} instance and context
*/
public Tuple<BlobServiceClient, Supplier<Context>> client(String clientName, BiConsumer<HttpRequest, HttpResponse> statsCollector) {
final AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);
if (azureStorageSettings == null) {
throw new SettingsException("Unable to find client with name [" + clientName + "]");
}
// New Azure storage clients are thread-safe and do not hold any state so could be cached, see please:
// https://github.com/Azure/azure-storage-java/blob/master/V12%20Upgrade%20Story.md#v12-the-best-of-both-worlds
ClientState state = clients.get(azureStorageSettings);
if (state == null) {
state = clients.computeIfAbsent(azureStorageSettings, key -> {
try {
return new Tuple<>(buildClient(azureStorageSettings), () -> buildOperationContext(azureStorageSettings));
return buildClient(azureStorageSettings, statsCollector);
} catch (InvalidKeyException | URISyntaxException | IllegalArgumentException e) {
throw new SettingsException("Invalid azure client settings with name [" + clientName + "]", e);
}
});
}
private CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
final CloudBlobClient client = createClient(azureStorageSettings);
// Set timeout option if the user sets cloud.azure.storage.timeout or
// cloud.azure.storage.xxx.timeout (it's negative by default)
final long timeout = azureStorageSettings.getTimeout().getMillis();
if (timeout > 0) {
if (timeout > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Timeout [" + azureStorageSettings.getTimeout() + "] exceeds 2,147,483,647ms.");
return new Tuple<>(state.getClient(), () -> buildOperationContext(azureStorageSettings));
}
client.getDefaultRequestOptions().setTimeoutIntervalInMs((int) timeout);
private ClientState buildClient(AzureStorageSettings azureStorageSettings, BiConsumer<HttpRequest, HttpResponse> statsCollector)
throws InvalidKeyException, URISyntaxException {
final BlobServiceClientBuilder builder = createClientBuilder(azureStorageSettings);
final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(new NioThreadFactory());
final NettyAsyncHttpClientBuilder clientBuilder = new NettyAsyncHttpClientBuilder()
.eventLoopGroup(eventLoopGroup);
final Proxy proxy = azureStorageSettings.getProxy();
if (proxy != null) {
final Type type = Arrays
.stream(Type.values())
.filter(t -> t.toProxyType().equals(proxy.type()))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unsupported proxy type: " + proxy.type()));
clientBuilder.proxy(new ProxyOptions(type, (InetSocketAddress)proxy.address()));
}
builder.httpClient(clientBuilder.build());
// We define a default exponential retry policy
client.getDefaultRequestOptions().setRetryPolicyFactory(createRetryPolicy(azureStorageSettings));
client.getDefaultRequestOptions().setLocationMode(azureStorageSettings.getLocationMode());
return client;
return new ClientState(
applyLocationMode(builder, azureStorageSettings)
.addPolicy(new HttpStatsPolicy(statsCollector))
.buildClient(),
eventLoopGroup);
}
/**
* The location mode is not there in v12 APIs anymore but it is possible to mimic its semantics using
* retry options and combination of primary / secondary endpoints. Refer to migration guide for mode details:
* https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md#miscellaneous
*/
private BlobServiceClientBuilder applyLocationMode(final BlobServiceClientBuilder builder, final AzureStorageSettings settings) {
final StorageConnectionString storageConnectionString = StorageConnectionString.create(settings.getConnectString(), logger);
final StorageEndpoint endpoint = storageConnectionString.getBlobEndpoint();
if (endpoint == null || endpoint.getPrimaryUri() == null) {
throw new IllegalArgumentException("connectionString missing required settings to derive blob service primary endpoint.");
}
final LocationMode locationMode = settings.getLocationMode();
if (locationMode == LocationMode.PRIMARY_ONLY) {
builder.retryOptions(createRetryPolicy(settings, null));
} else if (locationMode == LocationMode.SECONDARY_ONLY) {
if (endpoint.getSecondaryUri() == null) {
throw new IllegalArgumentException("connectionString missing required settings to derive blob service secondary endpoint.");
}
builder
.endpoint(endpoint.getSecondaryUri())
.retryOptions(createRetryPolicy(settings, null));
} else if (locationMode == LocationMode.PRIMARY_THEN_SECONDARY) {
builder.retryOptions(createRetryPolicy(settings, endpoint.getSecondaryUri()));
} else if (locationMode == LocationMode.SECONDARY_THEN_PRIMARY) {
if (endpoint.getSecondaryUri() == null) {
throw new IllegalArgumentException("connectionString missing required settings to derive blob service secondary endpoint.");
}
builder
.endpoint(endpoint.getSecondaryUri())
.retryOptions(createRetryPolicy(settings, endpoint.getPrimaryUri()));
} else {
throw new IllegalArgumentException("Unsupported location mode: " + locationMode);
}
return builder;
}
private static BlobServiceClientBuilder createClientBuilder(AzureStorageSettings settings)
throws InvalidKeyException, URISyntaxException {
return new BlobServiceClientBuilder().connectionString(settings.getConnectString());
}
/**
* For the time being, create an empty context but the implementation could be extended.
* @param azureStorageSettings azure seetings
* @return context instance
*/
private static Context buildOperationContext(AzureStorageSettings azureStorageSettings) {
return Context.NONE;
}
// non-static, package private for testing
RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
return new RetryExponentialRetry(RetryPolicy.DEFAULT_CLIENT_BACKOFF, azureStorageSettings.getMaxRetries());
}
private static CloudBlobClient createClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
final String connectionString = azureStorageSettings.getConnectString();
return CloudStorageAccount.parse(connectionString).createCloudBlobClient();
}
private static OperationContext buildOperationContext(AzureStorageSettings azureStorageSettings) {
final OperationContext context = new OperationContext();
context.setProxy(azureStorageSettings.getProxy());
return context;
RequestRetryOptions createRetryPolicy(final AzureStorageSettings azureStorageSettings, String secondaryHost) {
// We define a default exponential retry policy{
return new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, azureStorageSettings.getMaxRetries(),
(Integer)null, null, null, secondaryHost);
}
/**
@ -134,31 +252,142 @@ public class AzureStorageService {
*/
public Map<String, AzureStorageSettings> refreshAndClearCache(Map<String, AzureStorageSettings> clientsSettings) {
final Map<String, AzureStorageSettings> prevSettings = this.storageSettings;
final Map<AzureStorageSettings, ClientState> prevClients = new HashMap<>(this.clients);
prevClients.values().forEach(this::closeInternally);
prevClients.clear();
this.storageSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
this.clients.clear();
// clients are built lazily by {@link client(String)}
return prevSettings;
}
/**
* Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
* It should remove the container part (first part of the path) and gives path/to/myfile
* @param uri URI to parse
* @return The blob name relative to the container
*/
static String blobNameFromUri(URI uri) {
final String path = uri.getPath();
// We remove the container name from the path
// The 3 magic number cames from the fact if path is /container/path/to/myfile
// First occurrence is empty "/"
// Second occurrence is "container
// Last part contains "path/to/myfile" which is what we want to get
final String[] splits = path.split("/", 3);
// We return the remaining end of the string
return splits[2];
@Override
public void close() {
this.clients.values().forEach(this::closeInternally);
this.clients.clear();
}
public Duration getBlobRequestTimeout(String clientName) {
final AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);
if (azureStorageSettings == null) {
throw new SettingsException("Unable to find client with name [" + clientName + "]");
}
// Set timeout option if the user sets cloud.azure.storage.timeout or
// cloud.azure.storage.xxx.timeout (it's negative by default)
final long timeout = azureStorageSettings.getTimeout().getMillis();
if (timeout > 0) {
if (timeout > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Timeout [" + azureStorageSettings.getTimeout() + "] exceeds 2,147,483,647ms.");
}
return Duration.ofMillis(timeout);
}
// package private for testing
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
return null;
}
ParallelTransferOptions getBlobRequestOptionsForWriteBlob() {
return null;
}
private void closeInternally(ClientState state) {
final Future<?> shutdownFuture = state.getEventLoopGroup().shutdownGracefully(0, 5, TimeUnit.SECONDS);
shutdownFuture.awaitUninterruptibly();
if (shutdownFuture.isSuccess() == false) {
logger.warning("Error closing Netty Event Loop group", shutdownFuture.cause());
}
}
/**
* Implements HTTP pipeline policy to collect statistics on API calls. See please:
* https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md#miscellaneous
*/
private static class HttpStatsPolicy implements HttpPipelinePolicy {
private final BiConsumer<HttpRequest, HttpResponse> statsCollector;
HttpStatsPolicy(final BiConsumer<HttpRequest, HttpResponse> statsCollector) {
this.statsCollector = statsCollector;
}
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext httpPipelineCallContext, HttpPipelineNextPolicy httpPipelineNextPolicy) {
final HttpRequest request = httpPipelineCallContext.getHttpRequest();
return httpPipelineNextPolicy
.process()
.doOnNext(response -> statsCollector.accept(request, response));
}
@Override
public HttpPipelinePosition getPipelinePosition() {
// This policy must be in a position to see each retry
return HttpPipelinePosition.PER_RETRY;
}
}
/**
* Helper class to hold the state of the cached clients and associated event groups to support
* graceful shutdown logic.
*/
private static class ClientState {
private final BlobServiceClient client;
private final EventLoopGroup eventLoopGroup;
ClientState(final BlobServiceClient client, final EventLoopGroup eventLoopGroup) {
this.client = client;
this.eventLoopGroup = eventLoopGroup;
}
public BlobServiceClient getClient() {
return client;
}
public EventLoopGroup getEventLoopGroup() {
return eventLoopGroup;
}
}
/**
* The NIO thread factory which is aware of the SecurityManager
*/
private static class NioThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
NioThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "reactor-nio-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// See please: https://github.com/Azure/azure-sdk-for-java/pull/24374
final Runnable priviledged = () -> {
AccessController.doPrivileged((PrivilegedAction<?>) () -> {
r.run();
return null;
});
};
final Thread t = new Thread(group, priviledged, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}

View File

@ -32,8 +32,6 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.RetryPolicy;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.collect.MapBuilder;
@ -74,7 +72,7 @@ final class AzureStorageSettings {
/** max_retries: Number of retries in case of Azure errors. Defaults to 3 (RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT). */
public static final AffixSetting<Integer> MAX_RETRIES_SETTING =
Setting.affixKeySetting(AZURE_CLIENT_PREFIX_KEY, "max_retries",
(key) -> Setting.intSetting(key, RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT, Setting.Property.NodeScope),
(key) -> Setting.intSetting(key, 3, Setting.Property.NodeScope),
() -> ACCOUNT_SETTING, () -> KEY_SETTING);
/**
* Azure endpoint suffix. Default to core.windows.net (CloudStorageAccount.DEFAULT_DNS).

View File

@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.repositories.azure;
/**
* Specifies the location mode used to decide which location the request should be sent to. The enumeration
* simulates the semantics of the LocationMode, which is gone in the new SDKv12.
*/
public enum LocationMode {
/**
* Requests should always be sent to the primary location.
*/
PRIMARY_ONLY,
/**
* Requests should always be sent to the primary location first. If the request fails, it should be sent to the
* secondary location.
*/
PRIMARY_THEN_SECONDARY,
/**
* Requests should always be sent to the secondary location.
*/
SECONDARY_ONLY,
/**
* Requests should always be sent to the secondary location first. If the request fails, it should be sent to the
* primary location.
*/
SECONDARY_THEN_PRIMARY;
}

View File

@ -32,7 +32,7 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.StorageException;
import com.azure.storage.blob.models.BlobStorageException;
import org.apache.logging.log4j.core.util.Throwables;
import org.opensearch.SpecialPermission;
@ -64,7 +64,7 @@ public final class SocketAccess {
}
}
public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation) throws StorageException {
public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation) throws BlobStorageException {
SpecialPermission.check();
try {
return AccessController.doPrivileged(operation);
@ -75,7 +75,7 @@ public final class SocketAccess {
}
}
public static void doPrivilegedVoidException(StorageRunnable action) throws StorageException, URISyntaxException {
public static void doPrivilegedVoidException(StorageRunnable action) throws BlobStorageException, URISyntaxException {
SpecialPermission.check();
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
@ -89,7 +89,7 @@ public final class SocketAccess {
@FunctionalInterface
public interface StorageRunnable {
void executeCouldThrow() throws StorageException, URISyntaxException, IOException;
void executeCouldThrow() throws BlobStorageException, URISyntaxException, IOException;
}
}

View File

@ -32,6 +32,10 @@
grant {
// azure client opens socket connections for to access repository
permission java.net.SocketPermission "*", "connect";
permission java.net.SocketPermission "*", "connect,resolve";
permission java.lang.RuntimePermission "setFactory";
permission java.net.NetPermission "getProxySelector";
permission java.lang.RuntimePermission "accessDeclaredMembers";
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
permission java.lang.RuntimePermission "setContextClassLoader";
};

View File

@ -31,13 +31,15 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import fixture.azure.AzureHttpHandler;
import reactor.core.scheduler.Schedulers;
import org.apache.http.HttpStatus;
import org.opensearch.cluster.metadata.RepositoryMetadata;
@ -63,6 +65,7 @@ import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.ByteArrayOutputStream;
@ -113,6 +116,7 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
private HttpServer httpServer;
private ThreadPool threadPool;
private AzureStorageService service;
@Before
public void setUp() throws Exception {
@ -124,21 +128,31 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
@After
public void tearDown() throws Exception {
if (service != null) {
service.close();
service = null;
}
httpServer.stop(0);
super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}
@AfterClass
public static void shutdownSchedulers() {
Schedulers.shutdownNow();
}
private BlobContainer createBlobContainer(final int maxRetries) {
final Settings.Builder clientSettings = Settings.builder();
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
final InetSocketAddress address = httpServer.getAddress();
final String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://"
+ InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
+ InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort() + "/";
clientSettings.put(ENDPOINT_SUFFIX_SETTING.getConcreteSettingForNamespace(clientName).getKey(), endpoint);
clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries);
clientSettings.put(TIMEOUT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), TimeValue.timeValueMillis(500));
clientSettings.put(TIMEOUT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), TimeValue.timeValueMillis(1000));
final MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(ACCOUNT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "account");
@ -146,17 +160,16 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
secureSettings.setString(KEY_SETTING.getConcreteSettingForNamespace(clientName).getKey(), key);
clientSettings.setSecureSettings(secureSettings);
final AzureStorageService service = new AzureStorageService(clientSettings.build()) {
service = new AzureStorageService(clientSettings.build()) {
@Override
RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
return new RetryExponentialRetry(1, 10, 100, azureStorageSettings.getMaxRetries());
RequestRetryOptions createRetryPolicy(final AzureStorageSettings azureStorageSettings, String secondaryHost) {
return new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, azureStorageSettings.getMaxRetries(),
1, 10L, 100L, secondaryHost);
}
@Override
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
BlobRequestOptions options = new BlobRequestOptions();
options.setSingleBlobPutThresholdInBytes(Math.toIntExact(ByteSizeUnit.MB.toBytes(1)));
return options;
ParallelTransferOptions getBlobRequestOptionsForWriteBlob() {
return new ParallelTransferOptions().setMaxSingleUploadSizeLong(ByteSizeUnit.MB.toBytes(1));
}
};
@ -181,13 +194,15 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
blobContainer.readBlob("read_nonexistent_blob", position, length);
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("not found"));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("404"));
}
public void testReadBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(1, 5);
final CountDown countDownHead = new CountDown(maxRetries);
final CountDown countDownGet = new CountDown(maxRetries);
// The request retry policy counts the first attempt as retry, so we need to
// account for that and increase the max retry count by one.
final int maxRetries = randomIntBetween(2, 6);
final CountDown countDownHead = new CountDown(maxRetries - 1);
final CountDown countDownGet = new CountDown(maxRetries - 1);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/container/read_blob_max_retries", exchange -> {
try {
@ -195,7 +210,7 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
if ("HEAD".equals(exchange.getRequestMethod())) {
if (countDownHead.countDown()) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length));
exchange.getResponseHeaders().add("Content-Length", String.valueOf(bytes.length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
return;
@ -206,7 +221,7 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
assertThat(rangeStart, lessThan(bytes.length));
final int length = bytes.length - rangeStart;
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
exchange.getResponseHeaders().add("Content-Length", String.valueOf(length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
exchange.getResponseBody().write(bytes, rangeStart, length);
@ -230,14 +245,20 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
}
public void testReadRangeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(1, 5);
final CountDown countDownGet = new CountDown(maxRetries);
// The request retry policy counts the first attempt as retry, so we need to
// account for that and increase the max retry count by one.
final int maxRetries = randomIntBetween(2, 6);
final CountDown countDownGet = new CountDown(maxRetries - 1);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/container/read_range_blob_max_retries", exchange -> {
try {
Streams.readFully(exchange.getRequestBody());
if ("HEAD".equals(exchange.getRequestMethod())) {
throw new AssertionError("Should not HEAD blob for ranged reads");
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("Content-Length", String.valueOf(bytes.length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
return;
} else if ("GET".equals(exchange.getRequestMethod())) {
if (countDownGet.countDown()) {
final int rangeStart = getRangeStart(exchange);
@ -248,7 +269,7 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
final int length = (rangeEnd.get() - rangeStart) + 1;
assertThat(length, lessThanOrEqualTo(bytes.length - rangeStart));
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
exchange.getResponseHeaders().add("Content-Length", String.valueOf(length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
exchange.getResponseBody().write(bytes, rangeStart, length);
@ -274,12 +295,16 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
}
public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(1, 5);
final CountDown countDown = new CountDown(maxRetries);
// The request retry policy counts the first attempt as retry, so we need to
// account for that and increase the max retry count by one.
final int maxRetries = randomIntBetween(2, 6);
final CountDown countDown = new CountDown(maxRetries - 1);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/container/write_blob_max_retries", exchange -> {
if ("PUT".equals(exchange.getRequestMethod())) {
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
if (countDown.countDown()) {
final BytesReference body = Streams.readFully(exchange.getRequestBody());
if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) {
@ -311,10 +336,12 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
}
public void testWriteLargeBlob() throws Exception {
final int maxRetries = randomIntBetween(1, 5);
// The request retry policy counts the first attempt as retry, so we need to
// account for that and increase the max retry count by one.
final int maxRetries = randomIntBetween(3, 6);
final int nbBlocks = randomIntBetween(1, 2);
final byte[] data = randomBytes(Constants.DEFAULT_STREAM_WRITE_IN_BYTES * nbBlocks);
final byte[] data = randomBytes(BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE * nbBlocks);
final int nbErrors = 2; // we want all requests to fail at least once
final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * nbBlocks);
@ -322,14 +349,16 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
final Map<String, BytesReference> blocks = new ConcurrentHashMap<>();
httpServer.createContext("/container/write_large_blob", exchange -> {
if ("PUT".equals(exchange.getRequestMethod())) {
final Map<String, String> params = new HashMap<>();
if (exchange.getRequestURI().getQuery() != null) {
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
}
final String blockId = params.get("blockid");
if (Strings.hasText(blockId) && (countDownUploads.decrementAndGet() % 2 == 0)) {
blocks.put(blockId, Streams.readFully(exchange.getRequestBody()));
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
exchange.close();
return;
@ -350,6 +379,7 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
block.writeTo(blob);
}
assertArrayEquals(data, blob.toByteArray());
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
exchange.close();
return;
@ -365,7 +395,7 @@ public class AzureBlobContainerRetriesTests extends OpenSearchTestCase {
final BlobContainer blobContainer = createBlobContainer(maxRetries);
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
blobContainer.writeBlob("write_large_blob", stream, data.length * nbBlocks, false);
blobContainer.writeBlob("write_large_blob", stream, data.length, false);
}
assertThat(countDownUploads.get(), equalTo(0));
assertThat(countDownComplete.isCountedDown(), is(true));

View File

@ -32,7 +32,9 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import reactor.core.scheduler.Schedulers;
import org.junit.AfterClass;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
@ -49,6 +51,10 @@ import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class AzureRepositorySettingsTests extends OpenSearchTestCase {
@AfterClass
public static void shutdownSchedulers() {
Schedulers.shutdownNow();
}
private AzureRepository azureRepository(Settings settings) {
Settings internalSettings = Settings.builder()

View File

@ -32,9 +32,13 @@
package org.opensearch.repositories.azure;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.core.Base64;
import reactor.core.scheduler.Schedulers;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.common.policy.RequestRetryPolicy;
import org.junit.AfterClass;
import org.opensearch.common.settings.MockSecureSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
@ -50,19 +54,23 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import static org.opensearch.repositories.azure.AzureStorageService.blobNameFromUri;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class AzureStorageServiceTests extends OpenSearchTestCase {
@AfterClass
public static void shutdownSchedulers() {
Schedulers.shutdownNow();
}
public void testReadSecuredSettings() {
final Settings settings = Settings.builder().setSecureSettings(buildSecureSettings())
@ -95,10 +103,10 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
.put("azure.client.azure1.endpoint_suffix", "my_endpoint_suffix").build();
try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings)) {
final AzureStorageService azureStorageService = plugin.azureStoreService;
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
assertThat(client1.getEndpoint().toString(), equalTo("https://myaccount1.blob.my_endpoint_suffix"));
final CloudBlobClient client2 = azureStorageService.client("azure2").v1();
assertThat(client2.getEndpoint().toString(), equalTo("https://myaccount2.blob.core.windows.net"));
final BlobServiceClient client1 = azureStorageService.client("azure1").v1();
assertThat(client1.getAccountUrl(), equalTo("https://myaccount1.blob.my_endpoint_suffix"));
final BlobServiceClient client2 = azureStorageService.client("azure2").v1();
assertThat(client2.getAccountUrl(), equalTo("https://myaccount2.blob.core.windows.net"));
}
}
@ -117,28 +125,28 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build();
try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings1)) {
final AzureStorageService azureStorageService = plugin.azureStoreService;
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount11.blob.core.windows.net"));
final CloudBlobClient client12 = azureStorageService.client("azure2").v1();
assertThat(client12.getEndpoint().toString(), equalTo("https://myaccount12.blob.core.windows.net"));
final BlobServiceClient client11 = azureStorageService.client("azure1").v1();
assertThat(client11.getAccountUrl(), equalTo("https://myaccount11.blob.core.windows.net"));
final BlobServiceClient client12 = azureStorageService.client("azure2").v1();
assertThat(client12.getAccountUrl(), equalTo("https://myaccount12.blob.core.windows.net"));
// client 3 is missing
final SettingsException e1 = expectThrows(SettingsException.class, () -> azureStorageService.client("azure3"));
assertThat(e1.getMessage(), is("Unable to find client with name [azure3]"));
// update client settings
plugin.reload(settings2);
// old client 1 not changed
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount11.blob.core.windows.net"));
assertThat(client11.getAccountUrl(), equalTo("https://myaccount11.blob.core.windows.net"));
// new client 1 is changed
final CloudBlobClient client21 = azureStorageService.client("azure1").v1();
assertThat(client21.getEndpoint().toString(), equalTo("https://myaccount21.blob.core.windows.net"));
final BlobServiceClient client21 = azureStorageService.client("azure1").v1();
assertThat(client21.getAccountUrl(), equalTo("https://myaccount21.blob.core.windows.net"));
// old client 2 not changed
assertThat(client12.getEndpoint().toString(), equalTo("https://myaccount12.blob.core.windows.net"));
assertThat(client12.getAccountUrl(), equalTo("https://myaccount12.blob.core.windows.net"));
// new client2 is gone
final SettingsException e2 = expectThrows(SettingsException.class, () -> azureStorageService.client("azure2"));
assertThat(e2.getMessage(), is("Unable to find client with name [azure2]"));
// client 3 emerged
final CloudBlobClient client23 = azureStorageService.client("azure3").v1();
assertThat(client23.getEndpoint().toString(), equalTo("https://myaccount23.blob.core.windows.net"));
final BlobServiceClient client23 = azureStorageService.client("azure3").v1();
assertThat(client23.getAccountUrl(), equalTo("https://myaccount23.blob.core.windows.net"));
}
}
@ -149,16 +157,16 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
final Settings settings = Settings.builder().setSecureSettings(secureSettings).build();
try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings)) {
final AzureStorageService azureStorageService = plugin.azureStoreService;
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
final BlobServiceClient client11 = azureStorageService.client("azure1").v1();
assertThat(client11.getAccountUrl(), equalTo("https://myaccount1.blob.core.windows.net"));
// reinit with empty settings
final SettingsException e = expectThrows(SettingsException.class, () -> plugin.reload(Settings.EMPTY));
assertThat(e.getMessage(), is("If you want to use an azure repository, you need to define a client configuration."));
// existing client untouched
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
assertThat(client11.getAccountUrl(), equalTo("https://myaccount1.blob.core.windows.net"));
// new client also untouched
final CloudBlobClient client21 = azureStorageService.client("azure1").v1();
assertThat(client21.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
final BlobServiceClient client21 = azureStorageService.client("azure1").v1();
assertThat(client21.getAccountUrl(), equalTo("https://myaccount1.blob.core.windows.net"));
}
}
@ -178,14 +186,14 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
final Settings settings3 = Settings.builder().setSecureSettings(secureSettings3).build();
try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings1)) {
final AzureStorageService azureStorageService = plugin.azureStoreService;
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
final BlobServiceClient client11 = azureStorageService.client("azure1").v1();
assertThat(client11.getAccountUrl(), equalTo("https://myaccount1.blob.core.windows.net"));
final SettingsException e1 = expectThrows(SettingsException.class, () -> plugin.reload(settings2));
assertThat(e1.getMessage(), is("Neither a secret key nor a shared access token was set."));
final SettingsException e2 = expectThrows(SettingsException.class, () -> plugin.reload(settings3));
assertThat(e2.getMessage(), is("Both a secret as well as a shared access token were set."));
// existing client untouched
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
assertThat(client11.getAccountUrl(), equalTo("https://myaccount1.blob.core.windows.net"));
}
}
@ -201,23 +209,19 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
.put("azure.client.azure3.timeout", "30s")
.build();
final AzureStorageService azureStorageService = storageServiceWithSettingsValidation(timeoutSettings);
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), nullValue());
final CloudBlobClient client3 = azureStorageService.client("azure3").v1();
assertThat(client3.getDefaultRequestOptions().getTimeoutIntervalInMs(), is(30 * 1000));
assertThat(azureStorageService.getBlobRequestTimeout("azure1"), nullValue());
assertThat(azureStorageService.getBlobRequestTimeout("azure3"), is(Duration.ofSeconds(30)));
}
public void testGetSelectedClientNoTimeout() {
final AzureStorageService azureStorageService = storageServiceWithSettingsValidation(buildSettings());
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), is(nullValue()));
assertThat(azureStorageService.getBlobRequestTimeout("azure1"), nullValue());
}
public void testGetSelectedClientBackoffPolicy() {
final AzureStorageService azureStorageService = storageServiceWithSettingsValidation(buildSettings());
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
final BlobServiceClient client1 = azureStorageService.client("azure1").v1();
assertThat(requestRetryOptions(client1), is(notNullValue()));
}
public void testGetSelectedClientBackoffPolicyNbRetries() {
@ -227,9 +231,8 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
.build();
final AzureStorageService azureStorageService = storageServiceWithSettingsValidation(timeoutSettings);
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
final BlobServiceClient client1 = azureStorageService.client("azure1").v1();
assertThat(requestRetryOptions(client1), is(notNullValue()));
}
public void testNoProxy() {
@ -368,6 +371,36 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
}
private static String encodeKey(final String value) {
return Base64.encode(value.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8));
}
private static RequestRetryPolicy requestRetryOptions(BlobServiceClient client) {
for (int i = 0; i < client.getHttpPipeline().getPolicyCount(); ++i) {
final HttpPipelinePolicy policy = client.getHttpPipeline().getPolicy(i);
if (policy instanceof RequestRetryPolicy) {
return (RequestRetryPolicy)policy;
}
}
return null;
}
/**
* Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
* It should remove the container part (first part of the path) and gives path/to/myfile
* @param uri URI to parse
* @return The blob name relative to the container
*/
private static String blobNameFromUri(URI uri) {
final String path = uri.getPath();
// We remove the container name from the path
// The 3 magic number cames from the fact if path is /container/path/to/myfile
// First occurrence is empty "/"
// Second occurrence is "container
// Last part contains "path/to/myfile" which is what we want to get
final String[] splits = path.split("/", 3);
// We return the remaining end of the string
return splits[2];
}
}

View File

@ -87,6 +87,7 @@ public class AzureHttpHandler implements HttpHandler {
final String blockId = params.get("blockid");
blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) {
@ -104,6 +105,7 @@ public class AzureHttpHandler implements HttpHandler {
block.writeTo(blob);
}
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /" + container + "/*", request)) {
@ -117,6 +119,7 @@ public class AzureHttpHandler implements HttpHandler {
} else {
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
}
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("HEAD /" + container + "/*", request)) {
@ -126,8 +129,9 @@ public class AzureHttpHandler implements HttpHandler {
sendError(exchange, RestStatus.NOT_FOUND);
return;
}
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length()));
exchange.getResponseHeaders().add("Content-Length", String.valueOf(blob.length()));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else if (Regex.simpleMatch("GET /" + container + "/*", request)) {
@ -149,8 +153,9 @@ public class AzureHttpHandler implements HttpHandler {
final int length = Integer.parseInt(matcher.group(2)) - start + 1;
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
exchange.getResponseHeaders().add("Content-Length", String.valueOf(length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length);
@ -169,6 +174,7 @@ public class AzureHttpHandler implements HttpHandler {
list.append("<EnumerationResults>");
final String prefix = params.get("prefix");
final Set<String> blobPrefixes = new HashSet<>();
// Always use the delimiter (explicit or implicit), some APIs do not send it anymore
final String delimiter = params.get("delimiter");
if (delimiter != null) {
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
@ -187,9 +193,15 @@ public class AzureHttpHandler implements HttpHandler {
continue;
}
}
list.append("<Blob><Name>").append(blobPath).append("</Name>");
list.append("<Blob>");
if (delimiter != null) {
list.append("<IsPrefix>").append(blobPath.endsWith(delimiter)).append("</IsPrefix>");
}
list.append("<Name>").append(blobPath).append("</Name>");
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
list.append("<BlobType>BlockBlob</BlobType></Properties>");
list.append("</Blob>");
}
if (blobPrefixes.isEmpty() == false) {
blobPrefixes.forEach(p -> list.append("<BlobPrefix><Name>").append(p).append("</Name></BlobPrefix>"));
@ -200,6 +212,7 @@ public class AzureHttpHandler implements HttpHandler {
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);