[Remote Store] Upload segments to remote store post refresh (#3460)

* Add RemoteDirectory interface to copy segment files to/from remote store

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>

* Add index level setting for remote store

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>

* Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener

Co-authored-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Upload segment to remote store post refresh

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>
This commit is contained in:
Sachin Kale 2022-06-10 11:02:53 +05:30 committed by GitHub
parent fc541544be
commit fb13759876
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1176 additions and 12 deletions

View File

@ -675,7 +675,8 @@ public class IndexShardIT extends OpenSearchSingleNodeTestCase {
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
}

View File

@ -283,6 +283,17 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
Property.Final
);
public static final String SETTING_REMOTE_STORE = "index.remote_store";
/**
* Used to specify if the index data should be persisted in the remote store.
*/
public static final Setting<Boolean> INDEX_REMOTE_STORE_SETTING = Setting.boolSetting(
SETTING_REMOTE_STORE,
false,
Property.IndexScope,
Property.Final
);
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

View File

@ -217,7 +217,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
*/
public static final Map<String, Setting> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.REPLICATION_TYPE,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
FeatureFlags.REMOTE_STORE,
IndexMetadata.INDEX_REMOTE_STORE_SETTING
);
public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

View File

@ -23,6 +23,12 @@ public class FeatureFlags {
*/
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";
/**
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
* Once the feature is ready for production release, this feature flag can be removed.
*/
public static final String REMOTE_STORE = "opensearch.experimental.feature.remote_store.enabled";
/**
* Used to test feature flags whose values are expected to be booleans.
* This method returns true if the value is "true" (case-insensitive),

View File

@ -70,6 +70,7 @@ import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -118,6 +119,8 @@ public final class IndexModule {
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();
private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
@ -516,6 +519,7 @@ public final class IndexModule {
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
eventListener,
readerWrapperFactory,
mapperRegistry,

View File

@ -81,6 +81,7 @@ import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
@ -96,6 +97,9 @@ import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
@ -190,6 +195,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
@ -260,6 +266,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
@ -430,7 +437,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RepositoriesService repositoriesService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
@ -504,6 +512,21 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID());
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
}
store = new Store(
shardId,
this.indexSettings,
@ -533,7 +556,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStoreRefreshListener
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);

View File

@ -534,6 +534,7 @@ public final class IndexSettings {
private final Settings nodeSettings;
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
private volatile IndexMetadata indexMetadata;
@ -686,6 +687,7 @@ public final class IndexSettings {
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE, false);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
@ -927,6 +929,13 @@ public final class IndexSettings {
return ReplicationType.SEGMENT.equals(replicationType);
}
/**
* Returns if remote store is enabled for this index.
*/
public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}
/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.

View File

@ -304,6 +304,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;
private final RemoteStoreRefreshListener remoteStoreRefreshListener;
public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
@ -325,7 +327,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
@ -413,6 +416,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} else {
this.checkpointRefreshListener = null;
}
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
}
public ThreadPool getThreadPool() {
@ -3139,11 +3143,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
};
final List<ReferenceManager.RefreshListener> internalRefreshListener;
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
internalRefreshListener.add(remoteStoreRefreshListener);
}
if (this.checkpointRefreshListener != null) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
internalRefreshListener.add(checkpointRefreshListener);
}
return this.engineConfigFactory.newEngineConfig(

View File

@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.shard;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* RefreshListener implementation to upload newly created segment files to the remote store
*/
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
private final Directory storeDirectory;
private final Directory remoteDirectory;
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
private final Set<String> filesUploadedToRemoteStore;
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
// ToDo: Handle failures in reading list of files (GitHub #3397)
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
}
@Override
public void beforeRefresh() throws IOException {
// Do Nothing
}
/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* The method also deletes segment files from remote store which are not part of local filesystem.
* @param didRefresh true if the refresh opened a new reference
* @throws IOException in case of I/O error in reading list of local files
*/
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
Set<String> localFiles = Set.of(storeDirectory.listAll());
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
filesUploadedToRemoteStore.add(file);
} catch (NoSuchFileException e) {
logger.info(
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
e
);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});
Set<String> remoteFilesToBeDeleted = new HashSet<>();
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
try {
remoteDirectory.deleteFile(file);
remoteFilesToBeDeleted.add(file);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
}
});
remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
}
}
}

View File

@ -0,0 +1,193 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
/**
* A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store.
* A remoteDirectory contains only files (no sub-folder hierarchy). This class does not support all the methods in
* the Directory interface. Currently, it contains implementation of methods which are used to copy files to/from
* the remote store. Implementation of remaining methods will be added as remote store is integrated with
* replication, peer recovery etc.
*
* @opensearch.internal
*/
public final class RemoteDirectory extends Directory {
private final BlobContainer blobContainer;
public RemoteDirectory(BlobContainer blobContainer) {
this.blobContainer = blobContainer;
}
/**
* Returns names of all files stored in this directory. The output must be in sorted (UTF-16,
* java's {@link String#compareTo}) order.
*/
@Override
public String[] listAll() throws IOException {
return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new);
}
/**
* Removes an existing file in the directory.
*
* <p>This method will not throw an exception when the file doesn't exist and simply ignores this case.
* This is a deviation from the {@code Directory} interface where it is expected to throw either
* {@link NoSuchFileException} or {@link FileNotFoundException} if {@code name} points to a non-existing file.
*
* @param name the name of an existing file.
* @throws IOException if the file exists but could not be deleted.
*/
@Override
public void deleteFile(String name) throws IOException {
// ToDo: Add a check for file existence
blobContainer.deleteBlobsIgnoringIfNotExists(Collections.singletonList(name));
}
/**
* Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote
* store.
*
* <p> In the {@link Directory} interface, it is expected to throw {@link java.nio.file.FileAlreadyExistsException}
* if the file already exists in the remote store. As this method does not open a file, it does not throw the
* exception.
*
* @param name the name of the file to copy to remote store.
*/
@Override
public IndexOutput createOutput(String name, IOContext context) {
return new RemoteIndexOutput(name, blobContainer);
}
/**
* Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream.
*
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name));
}
/**
* Closes the directory by deleting all the files in this directory
*/
@Override
public void close() throws IOException {
blobContainer.delete();
}
/**
* Returns the byte length of a file in the directory.
*
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist
*/
@Override
public long fileLength(String name) throws IOException {
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata
Map<String, BlobMetadata> metadata = blobContainer.listBlobsByPrefix(name);
if (metadata.containsKey(name)) {
return metadata.get(name).length();
}
throw new NoSuchFileException(name);
}
/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once soft deleting is supported segment files in the remote store, this method will provide details of
* number of files marked as deleted but not actually deleted from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public Set<String> getPendingDeletions() throws IOException {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Temporary IndexOutput is not required while working with Remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Segment upload to the remote store will be permanent and does not require a separate sync API.
* This may change in the future if segment upload to remote store happens via cache and we need sync API to write
* the cache contents to the store permanently.
*
* @throws UnsupportedOperationException always
*/
@Override
public void sync(Collection<String> names) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once metadata to be stored with each shard is finalized, syncMetaData method will be used to sync the directory
* metadata to the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void syncMetaData() {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the directory unmodified.
* As this method is used by IndexWriter to publish commits, the implementation of this method is required when
* IndexWriter is backed by RemoteDirectory.
*
* @throws UnsupportedOperationException always
*/
@Override
public void rename(String source, String dest) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once locking segment files in remote store is supported, implementation of this method is required with
* remote store specific LockFactory.
*
* @throws UnsupportedOperationException always
*/
@Override
public Lock obtainLock(String name) throws IOException {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.apache.lucene.store.Directory;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
/**
* Factory for a remote store directory
*
* @opensearch.internal
*/
public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {
@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
}
}

View File

@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.apache.lucene.store.IndexInput;
import java.io.IOException;
import java.io.InputStream;
/**
* Class for input from a file in a {@link RemoteDirectory}. Used for all input operations from the remote store.
* Currently, only methods from {@link IndexInput} that are required for reading a file from remote store are
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication,
* peer recovery etc.
* ToDo: Extend ChecksumIndexInput
* @see RemoteDirectory
*
* @opensearch.internal
*/
public class RemoteIndexInput extends IndexInput {
private final InputStream inputStream;
private final long size;
public RemoteIndexInput(String name, InputStream inputStream, long size) {
super(name);
this.inputStream = inputStream;
this.size = size;
}
@Override
public byte readByte() throws IOException {
byte[] buffer = new byte[1];
inputStream.read(buffer);
return buffer[0];
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
inputStream.read(b, offset, len);
}
@Override
public void close() throws IOException {
inputStream.close();
}
@Override
public long length() {
return size;
}
@Override
public void seek(long pos) throws IOException {
inputStream.skip(pos);
}
/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import java.io.IOException;
/**
* Class for output to a file in a {@link RemoteDirectory}. Used for all output operations to the remote store.
* Currently, only methods from {@link IndexOutput} that are required for uploading a segment file to remote store are
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication,
* peer recovery etc.
* ToDo: Extend ChecksumIndexInput
* @see RemoteDirectory
*
* @opensearch.internal
*/
public class RemoteIndexOutput extends IndexOutput {
private final BlobContainer blobContainer;
public RemoteIndexOutput(String name, BlobContainer blobContainer) {
super(name, name);
this.blobContainer = blobContainer;
}
@Override
public void copyBytes(DataInput input, long numBytes) throws IOException {
assert input instanceof IndexInput : "input should be instance of IndexInput";
blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false);
}
/**
* This is a no-op. Once segment file upload to the remote store is complete, we don't need to explicitly close
* the stream. It is taken care by internal APIs of client of the remote store.
*/
@Override
public void close() throws IOException {
// do nothing
}
/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void writeByte(byte b) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void writeBytes(byte[] byteArray, int offset, int length) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
throw new UnsupportedOperationException();
}
/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not directly used for the file transfer to/from the remote store.
* But the checksum is important to verify integrity of the data and that means implementing this method will
* be required for the segment upload as well.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getChecksum() throws IOException {
throw new UnsupportedOperationException();
}
}

View File

@ -859,7 +859,13 @@ public class IndicesService extends AbstractLifecycleComponent
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
IndexShard indexShard = indexService.createShard(
shardRouting,
globalCheckpointSyncer,
retentionLeaseSyncer,
checkpointPublisher,
repositoriesService
);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS

View File

@ -39,6 +39,7 @@ import org.opensearch.common.Nullable;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.Repository;
import java.io.IOException;
import java.util.Collections;
@ -66,6 +67,22 @@ public interface IndexStorePlugin {
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}
/**
* An interface that describes how to create a new remote directory instance per shard.
*/
@FunctionalInterface
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param repository to get the BlobContainer details
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
}
/**
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a

View File

@ -21,6 +21,7 @@ public class FeatureFlagTests extends OpenSearchTestCase {
@BeforeClass
public static void enableFeature() {
AccessController.doPrivileged((PrivilegedAction<String>) () -> System.setProperty(FeatureFlags.REPLICATION_TYPE, "true"));
AccessController.doPrivileged((PrivilegedAction<String>) () -> System.setProperty(FeatureFlags.REMOTE_STORE, "true"));
}
public void testReplicationTypeFeatureFlag() {
@ -40,4 +41,10 @@ public class FeatureFlagTests extends OpenSearchTestCase {
assertNotNull(System.getProperty(javaVersionProperty));
assertFalse(FeatureFlags.isEnabled(javaVersionProperty));
}
public void testRemoteStoreFeatureFlag() {
String remoteStoreFlag = FeatureFlags.REMOTE_STORE;
assertNotNull(System.getProperty(remoteStoreFlag));
assertTrue(FeatureFlags.isEnabled(remoteStoreFlag));
}
}

View File

@ -41,6 +41,7 @@ import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.translog.Translog;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
@ -56,6 +57,7 @@ import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.StringContains.containsString;
import static org.hamcrest.object.HasToString.hasToString;
import static org.opensearch.common.settings.IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS;
public class IndexSettingsTests extends OpenSearchTestCase {
@ -753,4 +755,41 @@ public class IndexSettingsTests extends OpenSearchTestCase {
assertThat(indexSettings.getTranslogRetentionAge().millis(), equalTo(-1L));
assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L));
}
public void testRemoteStoreDefaultSetting() {
IndexMetadata metadata = newIndexMeta(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build()
);
IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
assertFalse(settings.isRemoteStoreEnabled());
}
public void testRemoteStoreExplicitSetting() {
IndexMetadata metadata = newIndexMeta(
"index",
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REMOTE_STORE, true)
.build()
);
IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
assertTrue(settings.isRemoteStoreEnabled());
}
public void testUpdateRemoteStoreFails() {
Set<Setting<?>> remoteStoreSettingSet = new HashSet<>();
remoteStoreSettingSet.add(FEATURE_FLAGGED_INDEX_SETTINGS.get(FeatureFlags.REMOTE_STORE));
IndexScopedSettings settings = new IndexScopedSettings(Settings.EMPTY, remoteStoreSettingSet);
IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> settings.updateSettings(
Settings.builder().put("index.remote_store", randomBoolean()).build(),
Settings.builder(),
Settings.builder(),
"index"
)
);
assertEquals(error.getMessage(), "final index setting [index.remote_store], not updateable");
}
}

View File

@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.shard;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.doThrow;
public class RemoteStoreRefreshListenerTests extends OpenSearchTestCase {
private Directory storeDirectory;
private Directory remoteDirectory;
private RemoteStoreRefreshListener remoteStoreRefreshListener;
public void setup(String[] remoteFiles) throws IOException {
storeDirectory = mock(Directory.class);
remoteDirectory = mock(Directory.class);
when(remoteDirectory.listAll()).thenReturn(remoteFiles);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(storeDirectory, remoteDirectory);
}
public void testAfterRefreshFalse() throws IOException {
setup(new String[0]);
remoteStoreRefreshListener.afterRefresh(false);
verify(storeDirectory, times(0)).listAll();
}
public void testAfterRefreshTrueNoLocalFiles() throws IOException {
setup(new String[0]);
when(storeDirectory.listAll()).thenReturn(new String[0]);
remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any());
verify(remoteDirectory, times(0)).deleteFile(any());
}
public void testAfterRefreshOnlyUploadFiles() throws IOException {
setup(new String[0]);
String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" };
when(storeDirectory.listAll()).thenReturn(localFiles);
remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
verify(remoteDirectory, times(0)).deleteFile(any());
}
public void testAfterRefreshOnlyUploadAndDelete() throws IOException {
setup(new String[] { "0.si", "0.cfs" });
String[] localFiles = new String[] { "segments_1", "1.si", "1.cfs", "1.cfe" };
when(storeDirectory.listAll()).thenReturn(localFiles);
remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.si", "1.si", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT);
verify(remoteDirectory).deleteFile("0.si");
verify(remoteDirectory).deleteFile("0.cfs");
}
public void testAfterRefreshOnlyDelete() throws IOException {
setup(new String[] { "0.si", "0.cfs" });
String[] localFiles = new String[] { "0.si" };
when(storeDirectory.listAll()).thenReturn(localFiles);
remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any());
verify(remoteDirectory).deleteFile("0.cfs");
}
public void testAfterRefreshTempLocalFile() throws IOException {
setup(new String[0]);
String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs.tmp" };
when(storeDirectory.listAll()).thenReturn(localFiles);
doThrow(new NoSuchFileException("0.cfs.tmp")).when(remoteDirectory)
.copyFrom(storeDirectory, "0.cfs.tmp", "0.cfs.tmp", IOContext.DEFAULT);
remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
verify(remoteDirectory, times(0)).deleteFile(any());
}
public void testAfterRefreshConsecutive() throws IOException {
setup(new String[0]);
String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" };
when(storeDirectory.listAll()).thenReturn(localFiles);
doThrow(new IOException("0.cfs")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfe", IOContext.DEFAULT);
doThrow(new IOException("0.cfe")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
remoteStoreRefreshListener.afterRefresh(true);
verify(storeDirectory).listAll();
verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT);
verify(remoteDirectory, times(0)).deleteFile(any());
String[] localFilesSecondRefresh = new String[] { "segments_1", "0.cfs", "1.cfs", "1.cfe" };
when(storeDirectory.listAll()).thenReturn(localFilesSecondRefresh);
remoteStoreRefreshListener.afterRefresh(true);
verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT);
verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT);
verify(remoteDirectory).deleteFile("0.si");
}
}

View File

@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.apache.lucene.store.Directory;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
public class RemoteDirectoryFactoryTests extends OpenSearchTestCase {
private RemoteDirectoryFactory remoteDirectoryFactory;
@Before
public void setup() {
remoteDirectoryFactory = new RemoteDirectoryFactory();
}
public void testNewDirectory() throws IOException {
Settings settings = Settings.builder().build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
BlobContainer blobContainer = mock(BlobContainer.class);
when(repository.blobStore()).thenReturn(blobStore);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());
Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository);
assertTrue(directory instanceof RemoteDirectory);
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
verify(blobStore).blobContainer(blobPathCaptor.capture());
BlobPath blobPath = blobPathCaptor.getValue();
assertEquals("foo/0/", blobPath.buildAsString());
directory.listAll();
verify(blobContainer).listBlobs();
}
}

View File

@ -0,0 +1,158 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.junit.Before;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.Mockito.*;
public class RemoteDirectoryTests extends OpenSearchTestCase {
private BlobContainer blobContainer;
private RemoteDirectory remoteDirectory;
@Before
public void setup() {
blobContainer = mock(BlobContainer.class);
remoteDirectory = new RemoteDirectory(blobContainer);
}
public void testListAllEmpty() throws IOException {
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());
String[] actualFileNames = remoteDirectory.listAll();
String[] expectedFileName = new String[] {};
assertArrayEquals(expectedFileName, actualFileNames);
}
public void testListAll() throws IOException {
Map<String, BlobMetadata> fileNames = Stream.of("abc", "xyz", "pqr", "lmn", "jkl")
.collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100)));
when(blobContainer.listBlobs()).thenReturn(fileNames);
String[] actualFileNames = remoteDirectory.listAll();
String[] expectedFileName = new String[] { "abc", "jkl", "lmn", "pqr", "xyz" };
assertArrayEquals(expectedFileName, actualFileNames);
}
public void testListAllException() throws IOException {
when(blobContainer.listBlobs()).thenThrow(new IOException("Error reading blob store"));
assertThrows(IOException.class, () -> remoteDirectory.listAll());
}
public void testDeleteFile() throws IOException {
remoteDirectory.deleteFile("segment_1");
verify(blobContainer).deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1"));
}
public void testDeleteFileException() throws IOException {
doThrow(new IOException("Error writing to blob store")).when(blobContainer)
.deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1"));
assertThrows(IOException.class, () -> remoteDirectory.deleteFile("segment_1"));
}
public void testCreateOutput() {
IndexOutput indexOutput = remoteDirectory.createOutput("segment_1", IOContext.DEFAULT);
assertTrue(indexOutput instanceof RemoteIndexOutput);
assertEquals("segment_1", indexOutput.getName());
}
public void testOpenInput() throws IOException {
InputStream mockInputStream = mock(InputStream.class);
when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream);
Map<String, BlobMetadata> fileInfo = new HashMap<>();
fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100));
when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo);
IndexInput indexInput = remoteDirectory.openInput("segment_1", IOContext.DEFAULT);
assertTrue(indexInput instanceof RemoteIndexInput);
assertEquals(100, indexInput.length());
}
public void testOpenInputIOException() throws IOException {
when(blobContainer.readBlob("segment_1")).thenThrow(new IOException("Error while reading"));
assertThrows(IOException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT));
}
public void testOpenInputNoSuchFileException() throws IOException {
InputStream mockInputStream = mock(InputStream.class);
when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream);
when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1"));
assertThrows(NoSuchFileException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT));
}
public void testClose() throws IOException {
remoteDirectory.close();
verify(blobContainer).delete();
}
public void testCloseIOException() throws IOException {
when(blobContainer.delete()).thenThrow(new IOException("Error while writing to blob store"));
assertThrows(IOException.class, () -> remoteDirectory.close());
}
public void testFileLength() throws IOException {
Map<String, BlobMetadata> fileInfo = new HashMap<>();
fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100));
when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo);
assertEquals(100, remoteDirectory.fileLength("segment_1"));
}
public void testFileLengthIOException() throws IOException {
when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1"));
assertThrows(IOException.class, () -> remoteDirectory.fileLength("segment_1"));
}
public void testGetPendingDeletions() {
assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.getPendingDeletions());
}
public void testCreateTempOutput() {
assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.createTempOutput("segment_1", "tmp", IOContext.DEFAULT));
}
public void testSync() {
assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.sync(Collections.emptyList()));
}
public void testRename() {
assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.rename("segment_1", "segment_2"));
}
public void testObtainLock() {
assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.obtainLock("segment_1"));
}
}

View File

@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.junit.Before;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import java.io.InputStream;
import static org.mockito.Mockito.*;
public class RemoteIndexInputTests extends OpenSearchTestCase {
private static final String FILENAME = "segment_1";
private static final long FILESIZE = 200;
private InputStream inputStream;
private RemoteIndexInput remoteIndexInput;
@Before
public void setup() {
inputStream = mock(InputStream.class);
remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE);
}
public void testReadByte() throws IOException {
InputStream inputStream = spy(InputStream.class);
remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE);
when(inputStream.read()).thenReturn(10);
assertEquals(10, remoteIndexInput.readByte());
verify(inputStream).read(any());
}
public void testReadByteIOException() throws IOException {
when(inputStream.read(any())).thenThrow(new IOException("Error reading"));
assertThrows(IOException.class, () -> remoteIndexInput.readByte());
}
public void testReadBytes() throws IOException {
byte[] buffer = new byte[10];
remoteIndexInput.readBytes(buffer, 10, 20);
verify(inputStream).read(buffer, 10, 20);
}
public void testReadBytesIOException() throws IOException {
byte[] buffer = new byte[10];
when(inputStream.read(buffer, 10, 20)).thenThrow(new IOException("Error reading"));
assertThrows(IOException.class, () -> remoteIndexInput.readBytes(buffer, 10, 20));
}
public void testClose() throws IOException {
remoteIndexInput.close();
verify(inputStream).close();
}
public void testCloseIOException() throws IOException {
doThrow(new IOException("Error closing")).when(inputStream).close();
assertThrows(IOException.class, () -> remoteIndexInput.close());
}
public void testLength() {
assertEquals(FILESIZE, remoteIndexInput.length());
}
public void testSeek() throws IOException {
remoteIndexInput.seek(10);
verify(inputStream).skip(10);
}
public void testSeekIOException() throws IOException {
when(inputStream.skip(10)).thenThrow(new IOException("Error reading"));
assertThrows(IOException.class, () -> remoteIndexInput.seek(10));
}
public void testGetFilePointer() {
assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.getFilePointer());
}
public void testSlice() {
assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.slice("Slice middle", 50, 100));
}
}

View File

@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.store;
import org.apache.lucene.store.IndexInput;
import org.junit.Before;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
public class RemoteIndexOutputTests extends OpenSearchTestCase {
private static final String FILENAME = "segment_1";
private BlobContainer blobContainer;
private RemoteIndexOutput remoteIndexOutput;
@Before
public void setup() {
blobContainer = mock(BlobContainer.class);
remoteIndexOutput = new RemoteIndexOutput(FILENAME, blobContainer);
}
public void testCopyBytes() throws IOException {
IndexInput indexInput = mock(IndexInput.class);
remoteIndexOutput.copyBytes(indexInput, 100);
verify(blobContainer).writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false));
}
public void testCopyBytesIOException() throws IOException {
doThrow(new IOException("Error writing")).when(blobContainer)
.writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false));
IndexInput indexInput = mock(IndexInput.class);
assertThrows(IOException.class, () -> remoteIndexOutput.copyBytes(indexInput, 100));
}
public void testWriteByte() {
byte b = 10;
assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeByte(b));
}
public void testWriteBytes() {
byte[] buffer = new byte[10];
assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeBytes(buffer, 50, 60));
}
public void testGetFilePointer() {
assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getFilePointer());
}
public void testGetChecksum() {
assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getChecksum());
}
}

View File

@ -153,7 +153,8 @@ public class IndicesLifecycleListenerSingleNodeTests extends OpenSearchSingleNod
newRouting,
s -> {},
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());

View File

@ -525,7 +525,8 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase {
globalCheckpointSyncer,
retentionLeaseSyncer,
breakerService,
checkpointPublisher
checkpointPublisher,
null
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;