start work on reimplementing cloud gateway
This commit is contained in:
parent
e61dc78c21
commit
1fccaf06e9
|
@ -55,6 +55,17 @@ public class BlobPath implements Iterable<String> {
|
||||||
return new BlobPath(builder.addAll(paths).add(path).build());
|
return new BlobPath(builder.addAll(paths).add(path).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String buildAsString(String separator) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (int i = 0; i < paths.size(); i++) {
|
||||||
|
sb.append(paths.get(i));
|
||||||
|
if (i < (paths.size() - 1)) {
|
||||||
|
sb.append(separator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
@Override public String toString() {
|
@Override public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
for (String path : paths) {
|
for (String path : paths) {
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.common.blobstore.fs;
|
package org.elasticsearch.common.blobstore.fs;
|
||||||
|
|
||||||
import org.elasticsearch.common.blobstore.*;
|
import org.elasticsearch.common.blobstore.*;
|
||||||
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
@ -35,7 +36,7 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class FsBlobStore implements BlobStore {
|
public class FsBlobStore extends AbstractComponent implements BlobStore {
|
||||||
|
|
||||||
private final File path;
|
private final File path;
|
||||||
|
|
||||||
|
@ -44,6 +45,7 @@ public class FsBlobStore implements BlobStore {
|
||||||
private final int bufferSizeInBytes;
|
private final int bufferSizeInBytes;
|
||||||
|
|
||||||
public FsBlobStore(Settings settings, File path) {
|
public FsBlobStore(Settings settings, File path) {
|
||||||
|
super(settings);
|
||||||
this.path = path;
|
this.path = path;
|
||||||
if (!path.exists()) {
|
if (!path.exists()) {
|
||||||
boolean b = path.mkdirs();
|
boolean b = path.mkdirs();
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
|
||||||
import org.elasticsearch.gateway.Gateway;
|
import org.elasticsearch.gateway.Gateway;
|
||||||
import org.elasticsearch.gateway.GatewayException;
|
import org.elasticsearch.gateway.GatewayException;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -57,9 +58,9 @@ public abstract class BlobStoreGateway extends AbstractLifecycleComponent<Gatewa
|
||||||
super(settings);
|
super(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initialize(BlobStore blobStore, ClusterName clusterName) throws IOException {
|
protected void initialize(BlobStore blobStore, ClusterName clusterName, @Nullable ByteSizeValue defaultChunkSize) throws IOException {
|
||||||
this.blobStore = blobStore;
|
this.blobStore = blobStore;
|
||||||
this.chunkSize = componentSettings.getAsBytesSize("chunk_size", null);
|
this.chunkSize = componentSettings.getAsBytesSize("chunk_size", defaultChunkSize);
|
||||||
this.basePath = BlobPath.cleanPath().add(clusterName.value());
|
this.basePath = BlobPath.cleanPath().add(clusterName.value());
|
||||||
this.metaDataBlobContainer = blobStore.immutableBlobContainer(basePath.add("metadata"));
|
this.metaDataBlobContainer = blobStore.immutableBlobContainer(basePath.add("metadata"));
|
||||||
this.currentIndex = findLatestIndex();
|
this.currentIndex = findLatestIndex();
|
||||||
|
|
|
@ -47,7 +47,7 @@ public class FsGateway extends BlobStoreGateway {
|
||||||
} else {
|
} else {
|
||||||
gatewayFile = new File(location);
|
gatewayFile = new File(location);
|
||||||
}
|
}
|
||||||
initialize(new FsBlobStore(componentSettings, gatewayFile), clusterName);
|
initialize(new FsBlobStore(componentSettings, gatewayFile), clusterName, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public String type() {
|
@Override public String type() {
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.cloud.blobstore;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
|
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||||
|
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
|
import org.elasticsearch.common.collect.Maps;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
import org.jclouds.blobstore.domain.PageSet;
|
||||||
|
import org.jclouds.blobstore.domain.StorageMetadata;
|
||||||
|
import org.jclouds.blobstore.options.ListContainerOptions;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class AbstractCloudBlobContainer extends AbstractBlobContainer {
|
||||||
|
|
||||||
|
protected final CloudBlobStore cloudBlobStore;
|
||||||
|
|
||||||
|
protected final String cloudPath;
|
||||||
|
|
||||||
|
public AbstractCloudBlobContainer(BlobPath path, CloudBlobStore cloudBlobStore) {
|
||||||
|
super(path);
|
||||||
|
this.cloudBlobStore = cloudBlobStore;
|
||||||
|
this.cloudPath = path.buildAsString("/");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean deleteBlob(String blobName) throws IOException {
|
||||||
|
cloudBlobStore.sync().removeBlob(cloudBlobStore.container(), buildBlobPath(blobName));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean blobExists(String blobName) {
|
||||||
|
return cloudBlobStore.sync().blobExists(cloudBlobStore.container(), buildBlobPath(blobName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void readBlob(String blobName, final ReadBlobListener listener) {
|
||||||
|
final ListenableFuture<? extends Blob> future = cloudBlobStore.async().getBlob(cloudBlobStore.container(), buildBlobPath(blobName));
|
||||||
|
future.addListener(new Runnable() {
|
||||||
|
@Override public void run() {
|
||||||
|
Blob blob;
|
||||||
|
try {
|
||||||
|
blob = future.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
return;
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
listener.onFailure(e.getCause());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
byte[] buffer = new byte[cloudBlobStore.bufferSizeInBytes()];
|
||||||
|
InputStream is = blob.getContent();
|
||||||
|
try {
|
||||||
|
int bytesRead;
|
||||||
|
while ((bytesRead = is.read(buffer)) != -1) {
|
||||||
|
listener.onPartial(buffer, 0, bytesRead);
|
||||||
|
}
|
||||||
|
listener.onCompleted();
|
||||||
|
} catch (Exception e) {
|
||||||
|
try {
|
||||||
|
is.close();
|
||||||
|
} catch (IOException e1) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, cloudBlobStore.executorService());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
|
||||||
|
PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(blobNamePrefix));
|
||||||
|
ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
|
||||||
|
for (StorageMetadata storageMetadata : list) {
|
||||||
|
blobs.put(storageMetadata.getName(), new PlainBlobMetaData(storageMetadata.getName(), storageMetadata.getSize(), null));
|
||||||
|
}
|
||||||
|
return blobs.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
|
||||||
|
PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive());
|
||||||
|
ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
|
||||||
|
for (StorageMetadata storageMetadata : list) {
|
||||||
|
blobs.put(storageMetadata.getName(), new PlainBlobMetaData(storageMetadata.getName(), storageMetadata.getSize(), null));
|
||||||
|
}
|
||||||
|
return blobs.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String buildBlobPath(String blobName) {
|
||||||
|
return cloudPath + "/" + blobName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, StorageMetadata> list(String container, String prefix) {
|
||||||
|
final Map<String, StorageMetadata> allMetaDatas = Maps.newHashMap();
|
||||||
|
|
||||||
|
String nextMarker = null;
|
||||||
|
while (true) {
|
||||||
|
ListContainerOptions options = ListContainerOptions.Builder.recursive();
|
||||||
|
if (prefix != null) {
|
||||||
|
options = options.inDirectory(prefix);
|
||||||
|
}
|
||||||
|
if (nextMarker != null) {
|
||||||
|
options.afterMarker(nextMarker);
|
||||||
|
}
|
||||||
|
PageSet<? extends StorageMetadata> pageSet = cloudBlobStore.sync().list(container, options);
|
||||||
|
for (StorageMetadata metadata : pageSet) {
|
||||||
|
allMetaDatas.put(metadata.getName(), metadata);
|
||||||
|
}
|
||||||
|
nextMarker = pageSet.getNextMarker();
|
||||||
|
if (nextMarker == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allMetaDatas;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,131 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.cloud.blobstore;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||||
|
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
|
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
||||||
|
import org.elasticsearch.common.blobstore.support.ImmutableAppendableBlobContainer;
|
||||||
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.jclouds.blobstore.AsyncBlobStore;
|
||||||
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
|
import org.jclouds.domain.Location;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class CloudBlobStore extends AbstractComponent implements BlobStore {
|
||||||
|
|
||||||
|
private final BlobStoreContext blobStoreContext;
|
||||||
|
|
||||||
|
private final String container;
|
||||||
|
|
||||||
|
private final Location location;
|
||||||
|
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
private final int bufferSizeInBytes;
|
||||||
|
|
||||||
|
public CloudBlobStore(Settings settings, BlobStoreContext blobStoreContext, String container, String location) {
|
||||||
|
super(settings);
|
||||||
|
this.blobStoreContext = blobStoreContext;
|
||||||
|
this.container = container;
|
||||||
|
|
||||||
|
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
|
||||||
|
this.executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "cloud_blobstore"));
|
||||||
|
|
||||||
|
if (location == null) {
|
||||||
|
this.location = null;
|
||||||
|
} else {
|
||||||
|
Location matchedLocation = null;
|
||||||
|
Set<? extends Location> assignableLocations = blobStoreContext.getBlobStore().listAssignableLocations();
|
||||||
|
for (Location oLocation : assignableLocations) {
|
||||||
|
if (oLocation.getId().equals(location)) {
|
||||||
|
matchedLocation = oLocation;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.location = matchedLocation;
|
||||||
|
if (this.location == null) {
|
||||||
|
throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + assignableLocations);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.debug("Using location [{}], container [{}]", this.location, this.container);
|
||||||
|
sync().createContainerInLocation(this.location, container);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int bufferSizeInBytes() {
|
||||||
|
return this.bufferSizeInBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ExecutorService executorService() {
|
||||||
|
return executorService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String container() {
|
||||||
|
return this.container;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Location location() {
|
||||||
|
return this.location;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AsyncBlobStore async() {
|
||||||
|
return blobStoreContext.getAsyncBlobStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
public org.jclouds.blobstore.BlobStore sync() {
|
||||||
|
return blobStoreContext.getBlobStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
|
||||||
|
return new CloudImmutableBlobContainer(path, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
|
||||||
|
return new ImmutableAppendableBlobContainer(immutableBlobContainer(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void delete(BlobPath path) {
|
||||||
|
sync().deleteDirectory(container, path.buildAsString("/"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void close() {
|
||||||
|
executorService.shutdown();
|
||||||
|
try {
|
||||||
|
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.cloud.blobstore;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
|
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
||||||
|
import org.elasticsearch.common.blobstore.support.BlobStores;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class CloudImmutableBlobContainer extends AbstractCloudBlobContainer implements ImmutableBlobContainer {
|
||||||
|
|
||||||
|
public CloudImmutableBlobContainer(BlobPath path, CloudBlobStore cloudBlobStore) {
|
||||||
|
super(path, cloudBlobStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes, final WriterListener listener) {
|
||||||
|
Blob blob = cloudBlobStore.sync().newBlob(blobName);
|
||||||
|
blob.setPayload(is);
|
||||||
|
blob.setContentLength(sizeInBytes);
|
||||||
|
final ListenableFuture<String> future = cloudBlobStore.async().putBlob(cloudBlobStore.container(), blob);
|
||||||
|
future.addListener(new Runnable() {
|
||||||
|
@Override public void run() {
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
listener.onCompleted();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
listener.onFailure(e.getCause());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
listener.onFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, cloudBlobStore.executorService());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
|
||||||
|
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,8 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.jclouds.concurrent.config.ExecutorServiceModule;
|
import org.jclouds.concurrent.config.ExecutorServiceModule;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
|
@ -34,7 +36,7 @@ public class JCloudsUtils {
|
||||||
public static Iterable<? extends Module> buildModules(Settings settings) {
|
public static Iterable<? extends Module> buildModules(Settings settings) {
|
||||||
return ImmutableList.of(new JCloudsLoggingModule(settings),
|
return ImmutableList.of(new JCloudsLoggingModule(settings),
|
||||||
new ExecutorServiceModule(
|
new ExecutorServiceModule(
|
||||||
java.util.concurrent.Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-user")),
|
Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-user")),
|
||||||
java.util.concurrent.Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-io"))));
|
Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-io"))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,206 +19,43 @@
|
||||||
|
|
||||||
package org.elasticsearch.gateway.cloud;
|
package org.elasticsearch.gateway.cloud;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticSearchException;
|
|
||||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||||
|
import org.elasticsearch.cloud.blobstore.CloudBlobStore;
|
||||||
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
|
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.inject.Module;
|
import org.elasticsearch.common.inject.Module;
|
||||||
import org.elasticsearch.common.io.FastByteArrayInputStream;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
|
||||||
import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
|
|
||||||
import org.elasticsearch.gateway.Gateway;
|
|
||||||
import org.elasticsearch.gateway.GatewayException;
|
|
||||||
import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule;
|
import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule;
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
|
||||||
import org.jclouds.blobstore.domain.PageSet;
|
|
||||||
import org.jclouds.blobstore.domain.StorageMetadata;
|
|
||||||
import org.jclouds.domain.Location;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
|
public class CloudGateway extends BlobStoreGateway {
|
||||||
|
|
||||||
private final BlobStoreContext blobStoreContext;
|
@Inject public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) throws IOException {
|
||||||
|
|
||||||
|
|
||||||
private final String container;
|
|
||||||
|
|
||||||
private final Location location;
|
|
||||||
|
|
||||||
private final String metaDataDirectory;
|
|
||||||
|
|
||||||
private final ByteSizeValue chunkSize;
|
|
||||||
|
|
||||||
private volatile int currentIndex;
|
|
||||||
|
|
||||||
@Inject public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) {
|
|
||||||
super(settings);
|
super(settings);
|
||||||
this.blobStoreContext = blobStoreService.context();
|
|
||||||
|
|
||||||
this.chunkSize = componentSettings.getAsBytesSize("chunk_size", null);
|
|
||||||
|
|
||||||
String location = componentSettings.get("location");
|
String location = componentSettings.get("location");
|
||||||
if (location == null) {
|
String container = componentSettings.get("container");
|
||||||
this.location = null;
|
|
||||||
} else {
|
|
||||||
Location matchedLocation = null;
|
|
||||||
Set<? extends Location> assignableLocations = blobStoreContext.getBlobStore().listAssignableLocations();
|
|
||||||
for (Location oLocation : assignableLocations) {
|
|
||||||
if (oLocation.getId().equals(location)) {
|
|
||||||
matchedLocation = oLocation;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.location = matchedLocation;
|
|
||||||
if (this.location == null) {
|
|
||||||
throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + assignableLocations);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.container = componentSettings.get("container");
|
|
||||||
if (container == null) {
|
if (container == null) {
|
||||||
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
|
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
|
||||||
}
|
}
|
||||||
this.metaDataDirectory = clusterName.value() + "/metadata";
|
|
||||||
logger.debug("Using location [{}], container [{}], metadata_directory [{}]", this.location, this.container, metaDataDirectory);
|
|
||||||
blobStoreContext.getBlobStore().createContainerInLocation(this.location, container);
|
|
||||||
|
|
||||||
this.currentIndex = findLatestIndex();
|
initialize(new CloudBlobStore(settings, blobStoreService.context(), container, location), clusterName, new ByteSizeValue(100, ByteSizeUnit.MB));
|
||||||
logger.debug("Latest metadata found at index [" + currentIndex + "]");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public String type() {
|
@Override public String type() {
|
||||||
return "cloud";
|
return "cloud";
|
||||||
}
|
}
|
||||||
|
|
||||||
public String container() {
|
|
||||||
return this.container;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Location location() {
|
|
||||||
return this.location;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ByteSizeValue chunkSize() {
|
|
||||||
return this.chunkSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override protected void doStart() throws ElasticSearchException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override protected void doStop() throws ElasticSearchException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override protected void doClose() throws ElasticSearchException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void write(MetaData metaData) throws GatewayException {
|
|
||||||
try {
|
|
||||||
String name = metaDataDirectory + "/metadata-" + (currentIndex + 1);
|
|
||||||
|
|
||||||
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
|
|
||||||
builder.prettyPrint();
|
|
||||||
builder.startObject();
|
|
||||||
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
|
|
||||||
builder.endObject();
|
|
||||||
|
|
||||||
Blob blob = blobStoreContext.getBlobStore().newBlob(name);
|
|
||||||
blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()));
|
|
||||||
blob.setContentLength(builder.unsafeBytesLength());
|
|
||||||
|
|
||||||
blobStoreContext.getBlobStore().putBlob(container, blob);
|
|
||||||
|
|
||||||
currentIndex++;
|
|
||||||
|
|
||||||
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory));
|
|
||||||
for (StorageMetadata storageMetadata : pageSet) {
|
|
||||||
if (storageMetadata.getName().contains("metadata-") && !name.equals(storageMetadata.getName())) {
|
|
||||||
blobStoreContext.getAsyncBlobStore().removeBlob(container, storageMetadata.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new GatewayException("can't write new metadata file into the gateway", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public MetaData read() throws GatewayException {
|
|
||||||
try {
|
|
||||||
if (currentIndex == -1)
|
|
||||||
return null;
|
|
||||||
|
|
||||||
return readMetaData(metaDataDirectory + "/metadata-" + currentIndex);
|
|
||||||
} catch (GatewayException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new GatewayException("can't read metadata file from the gateway", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public Class<? extends Module> suggestIndexGateway() {
|
@Override public Class<? extends Module> suggestIndexGateway() {
|
||||||
return CloudIndexGatewayModule.class;
|
return CloudIndexGatewayModule.class;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void reset() {
|
|
||||||
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory));
|
|
||||||
for (StorageMetadata storageMetadata : pageSet) {
|
|
||||||
if (storageMetadata.getName().contains("metadata-")) {
|
|
||||||
blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
currentIndex = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int findLatestIndex() {
|
|
||||||
int index = -1;
|
|
||||||
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory).maxResults(1000));
|
|
||||||
for (StorageMetadata storageMetadata : pageSet) {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]");
|
|
||||||
}
|
|
||||||
if (!storageMetadata.getName().contains("metadata-")) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
int fileIndex = Integer.parseInt(storageMetadata.getName().substring(storageMetadata.getName().lastIndexOf('-') + 1));
|
|
||||||
if (fileIndex >= index) {
|
|
||||||
// try and read the meta data
|
|
||||||
try {
|
|
||||||
readMetaData(storageMetadata.getName());
|
|
||||||
index = fileIndex;
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.warn("[findLatestMetadata]: Failed to read metadata from [" + storageMetadata.getName() + "], ignoring...", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return index;
|
|
||||||
}
|
|
||||||
|
|
||||||
private MetaData readMetaData(String name) throws IOException {
|
|
||||||
XContentParser parser = null;
|
|
||||||
try {
|
|
||||||
Blob blob = blobStoreContext.getBlobStore().getBlob(container, name);
|
|
||||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent());
|
|
||||||
return MetaData.Builder.fromXContent(parser, settings);
|
|
||||||
} finally {
|
|
||||||
if (parser != null) {
|
|
||||||
parser.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,121 +19,28 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.gateway.cloud;
|
package org.elasticsearch.index.gateway.cloud;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticSearchException;
|
|
||||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
|
||||||
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
|
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
||||||
import org.elasticsearch.gateway.Gateway;
|
import org.elasticsearch.gateway.Gateway;
|
||||||
import org.elasticsearch.gateway.cloud.CloudGateway;
|
|
||||||
import org.elasticsearch.index.AbstractIndexComponent;
|
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.gateway.IndexGateway;
|
|
||||||
import org.elasticsearch.index.gateway.IndexShardGateway;
|
import org.elasticsearch.index.gateway.IndexShardGateway;
|
||||||
|
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
|
||||||
import org.jclouds.domain.Location;
|
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class CloudIndexGateway extends AbstractIndexComponent implements IndexGateway {
|
public class CloudIndexGateway extends BlobStoreIndexGateway {
|
||||||
|
|
||||||
private final Gateway gateway;
|
@Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) {
|
||||||
|
super(index, indexSettings, gateway);
|
||||||
private final String indexContainer;
|
|
||||||
|
|
||||||
private final String indexDirectory;
|
|
||||||
|
|
||||||
private final Location location;
|
|
||||||
|
|
||||||
private final ByteSizeValue chunkSize;
|
|
||||||
|
|
||||||
private final BlobStoreContext blobStoreContext;
|
|
||||||
|
|
||||||
@Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, ClusterName clusterName, CloudBlobStoreService blobStoreService, Gateway gateway) {
|
|
||||||
super(index, indexSettings);
|
|
||||||
this.blobStoreContext = blobStoreService.context();
|
|
||||||
this.gateway = gateway;
|
|
||||||
|
|
||||||
String location = componentSettings.get("location");
|
|
||||||
String container = componentSettings.get("container");
|
|
||||||
ByteSizeValue chunkSize = componentSettings.getAsBytesSize("chunk_size", null);
|
|
||||||
|
|
||||||
if (gateway instanceof CloudGateway) {
|
|
||||||
CloudGateway cloudGateway = (CloudGateway) gateway;
|
|
||||||
if (container == null) {
|
|
||||||
container = cloudGateway.container();
|
|
||||||
}
|
|
||||||
if (chunkSize == null) {
|
|
||||||
chunkSize = cloudGateway.chunkSize();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (chunkSize == null) {
|
|
||||||
chunkSize = new ByteSizeValue(1, ByteSizeUnit.GB);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (location == null) {
|
|
||||||
if (gateway instanceof CloudGateway) {
|
|
||||||
CloudGateway cloudGateway = (CloudGateway) gateway;
|
|
||||||
this.location = cloudGateway.location();
|
|
||||||
} else {
|
|
||||||
this.location = null;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Location matchedLocation = null;
|
|
||||||
Set<? extends Location> assignableLocations = blobStoreContext.getBlobStore().listAssignableLocations();
|
|
||||||
for (Location oLocation : assignableLocations) {
|
|
||||||
if (oLocation.getId().equals(location)) {
|
|
||||||
matchedLocation = oLocation;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.location = matchedLocation;
|
|
||||||
if (this.location == null) {
|
|
||||||
throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + assignableLocations);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.indexContainer = container;
|
|
||||||
this.indexDirectory = clusterName.value() + "/indices/" + index.name();
|
|
||||||
this.chunkSize = chunkSize;
|
|
||||||
|
|
||||||
logger.debug("Using location [{}], container [{}], index_directory [{}], chunk_size [{}]", this.location, this.indexContainer, this.indexDirectory, this.chunkSize);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public String type() {
|
@Override public String type() {
|
||||||
return "cloud";
|
return "cloud";
|
||||||
}
|
}
|
||||||
|
|
||||||
public Location indexLocation() {
|
|
||||||
return this.location;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String indexContainer() {
|
|
||||||
return this.indexContainer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String indexDirectory() {
|
|
||||||
return this.indexDirectory;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ByteSizeValue chunkSize() {
|
|
||||||
return this.chunkSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
|
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
|
||||||
return CloudIndexShardGateway.class;
|
return CloudIndexShardGateway.class;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void close(boolean delete) throws ElasticSearchException {
|
|
||||||
if (!delete) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,571 +19,28 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.gateway.cloud;
|
package org.elasticsearch.index.gateway.cloud;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
import org.apache.lucene.index.IndexReader;
|
|
||||||
import org.apache.lucene.store.Directory;
|
|
||||||
import org.apache.lucene.store.IndexInput;
|
|
||||||
import org.apache.lucene.store.IndexOutput;
|
|
||||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
|
||||||
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
|
|
||||||
import org.elasticsearch.common.collect.Lists;
|
|
||||||
import org.elasticsearch.common.collect.Maps;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
|
||||||
import org.elasticsearch.common.lucene.Directories;
|
|
||||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
|
||||||
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
|
||||||
import org.elasticsearch.index.gateway.IndexGateway;
|
import org.elasticsearch.index.gateway.IndexGateway;
|
||||||
import org.elasticsearch.index.gateway.IndexShardGateway;
|
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexShardGateway;
|
||||||
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
|
|
||||||
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
|
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.service.IndexShard;
|
import org.elasticsearch.index.shard.service.IndexShard;
|
||||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
|
||||||
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
|
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
|
||||||
import org.jclouds.blobstore.domain.PageSet;
|
|
||||||
import org.jclouds.blobstore.domain.StorageMetadata;
|
|
||||||
import org.jclouds.blobstore.options.ListContainerOptions;
|
|
||||||
import org.jclouds.domain.Location;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
import static org.elasticsearch.index.translog.TranslogStreams.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class CloudIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
|
public class CloudIndexShardGateway extends BlobStoreIndexShardGateway {
|
||||||
|
|
||||||
private final InternalIndexShard indexShard;
|
@Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway,
|
||||||
|
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
|
||||||
private final ThreadPool threadPool;
|
super(shardId, indexSettings, threadPool, indexGateway, indexShard, store, recoveryThrottler);
|
||||||
|
|
||||||
private final RecoveryThrottler recoveryThrottler;
|
|
||||||
|
|
||||||
private final Store store;
|
|
||||||
|
|
||||||
private final Location shardLocation;
|
|
||||||
|
|
||||||
private final String container;
|
|
||||||
|
|
||||||
private final String shardDirectory;
|
|
||||||
|
|
||||||
private final String shardIndexDirectory;
|
|
||||||
|
|
||||||
private final String shardTranslogDirectory;
|
|
||||||
|
|
||||||
private final BlobStoreContext blobStoreContext;
|
|
||||||
|
|
||||||
private final ByteSizeValue chunkSize;
|
|
||||||
|
|
||||||
private volatile int currentTranslogPartToWrite = 1;
|
|
||||||
|
|
||||||
@Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard, ThreadPool threadPool,
|
|
||||||
Store store, RecoveryThrottler recoveryThrottler, IndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) {
|
|
||||||
super(shardId, indexSettings);
|
|
||||||
this.indexShard = (InternalIndexShard) indexShard;
|
|
||||||
this.threadPool = threadPool;
|
|
||||||
this.recoveryThrottler = recoveryThrottler;
|
|
||||||
this.store = store;
|
|
||||||
this.blobStoreContext = blobStoreService.context();
|
|
||||||
|
|
||||||
this.chunkSize = ((CloudIndexGateway) cloudIndexGateway).chunkSize();
|
|
||||||
this.shardLocation = ((CloudIndexGateway) cloudIndexGateway).indexLocation();
|
|
||||||
this.container = ((CloudIndexGateway) cloudIndexGateway).indexContainer();
|
|
||||||
|
|
||||||
this.shardDirectory = ((CloudIndexGateway) cloudIndexGateway).indexDirectory() + "/" + shardId.id();
|
|
||||||
this.shardIndexDirectory = shardDirectory + "/index";
|
|
||||||
this.shardTranslogDirectory = shardDirectory + "/translog";
|
|
||||||
|
|
||||||
logger.trace("Using location [{}], container [{}], shard_directory [{}]", this.shardLocation, this.container, this.shardDirectory);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public String type() {
|
@Override public String type() {
|
||||||
return "cloud";
|
return "cloud";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean requiresSnapshotScheduling() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public String toString() {
|
|
||||||
StringBuilder sb = new StringBuilder("cloud[");
|
|
||||||
if (shardLocation != null) {
|
|
||||||
sb.append(shardLocation).append("/");
|
|
||||||
}
|
|
||||||
sb.append(container).append("]");
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void close(boolean delete) {
|
|
||||||
if (!delete) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, StorageMetadata> metaDatas = listAllMetadatas(container, shardIndexDirectory);
|
|
||||||
for (Map.Entry<String, StorageMetadata> entry : metaDatas.entrySet()) {
|
|
||||||
blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
|
|
||||||
}
|
|
||||||
metaDatas = listAllMetadatas(container, shardTranslogDirectory);
|
|
||||||
for (Map.Entry<String, StorageMetadata> entry : metaDatas.entrySet()) {
|
|
||||||
blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
|
|
||||||
RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
|
|
||||||
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
|
|
||||||
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
|
|
||||||
long totalTimeStart = System.currentTimeMillis();
|
|
||||||
boolean indexDirty = false;
|
|
||||||
|
|
||||||
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
|
|
||||||
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
|
|
||||||
|
|
||||||
Map<String, StorageMetadata> allIndicesMetadata = null;
|
|
||||||
|
|
||||||
int indexNumberOfFiles = 0;
|
|
||||||
long indexTotalFilesSize = 0;
|
|
||||||
long indexTime = 0;
|
|
||||||
if (snapshot.indexChanged()) {
|
|
||||||
long time = System.currentTimeMillis();
|
|
||||||
indexDirty = true;
|
|
||||||
allIndicesMetadata = listAllMetadatas(container, shardIndexDirectory);
|
|
||||||
// snapshot into the index
|
|
||||||
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
|
|
||||||
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
|
|
||||||
for (final String fileName : snapshotIndexCommit.getFiles()) {
|
|
||||||
// don't copy over the segments file, it will be copied over later on as part of the
|
|
||||||
// final snapshot phase
|
|
||||||
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
|
|
||||||
latch.countDown();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
IndexInput indexInput = null;
|
|
||||||
try {
|
|
||||||
indexInput = snapshotIndexCommit.getDirectory().openInput(fileName);
|
|
||||||
long totalLength = 0;
|
|
||||||
int counter = 0;
|
|
||||||
while (true) {
|
|
||||||
String blobName = shardIndexDirectory + "/" + fileName;
|
|
||||||
if (counter > 0) {
|
|
||||||
blobName = blobName + ".part" + counter;
|
|
||||||
}
|
|
||||||
StorageMetadata metadata = allIndicesMetadata.get(blobName);
|
|
||||||
if (metadata == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
totalLength += metadata.getSize();
|
|
||||||
counter++;
|
|
||||||
}
|
|
||||||
if (totalLength == indexInput.length()) {
|
|
||||||
// we assume its the same one, no need to copy
|
|
||||||
latch.countDown();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.debug("Failed to verify file equality based on length, copying...", e);
|
|
||||||
} finally {
|
|
||||||
if (indexInput != null) {
|
|
||||||
try {
|
|
||||||
indexInput.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
indexNumberOfFiles++;
|
|
||||||
try {
|
|
||||||
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName);
|
|
||||||
} catch (IOException e) {
|
|
||||||
// ignore...
|
|
||||||
}
|
|
||||||
deleteFile(shardIndexDirectory + "/" + fileName, allIndicesMetadata);
|
|
||||||
try {
|
|
||||||
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, latch, failures);
|
|
||||||
} catch (Exception e) {
|
|
||||||
failures.add(e);
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
latch.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
failures.add(e);
|
|
||||||
}
|
|
||||||
if (!failures.isEmpty()) {
|
|
||||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
|
|
||||||
}
|
|
||||||
indexTime = System.currentTimeMillis() - time;
|
|
||||||
}
|
|
||||||
|
|
||||||
int translogNumberOfOperations = 0;
|
|
||||||
long translogTime = 0;
|
|
||||||
if (snapshot.newTranslogCreated()) {
|
|
||||||
currentTranslogPartToWrite = 1;
|
|
||||||
String translogBlobName = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
|
|
||||||
|
|
||||||
try {
|
|
||||||
long time = System.currentTimeMillis();
|
|
||||||
|
|
||||||
if (true) {
|
|
||||||
throw new ElasticSearchIllegalStateException("cloud plugin is currently disabled");
|
|
||||||
}
|
|
||||||
|
|
||||||
currentTranslogPartToWrite++;
|
|
||||||
|
|
||||||
translogTime = System.currentTimeMillis() - time;
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogBlobName + "]", e);
|
|
||||||
}
|
|
||||||
} else if (snapshot.sameTranslogNewOperations()) {
|
|
||||||
String translogBlobName = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
|
|
||||||
try {
|
|
||||||
long time = System.currentTimeMillis();
|
|
||||||
|
|
||||||
if (true) {
|
|
||||||
throw new ElasticSearchIllegalStateException("cloud plugin is currently disabled");
|
|
||||||
}
|
|
||||||
|
|
||||||
currentTranslogPartToWrite++;
|
|
||||||
|
|
||||||
translogTime = System.currentTimeMillis() - time;
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to append snapshot translog into [" + translogBlobName + "]", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// now write the segments file
|
|
||||||
try {
|
|
||||||
if (indexDirty) {
|
|
||||||
indexNumberOfFiles++;
|
|
||||||
deleteFile(snapshotIndexCommit.getSegmentsFileName(), allIndicesMetadata);
|
|
||||||
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
|
|
||||||
long time = System.currentTimeMillis();
|
|
||||||
|
|
||||||
|
|
||||||
IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName());
|
|
||||||
try {
|
|
||||||
Blob blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + snapshotIndexCommit.getSegmentsFileName());
|
|
||||||
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE);
|
|
||||||
blob.setPayload(is);
|
|
||||||
blob.setContentLength(is.actualSizeToRead());
|
|
||||||
blobStoreContext.getBlobStore().putBlob(container, blob);
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
indexInput.close();
|
|
||||||
} catch (Exception e) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
indexTime += (System.currentTimeMillis() - time);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete the old translog
|
|
||||||
if (snapshot.newTranslogCreated()) {
|
|
||||||
String currentTranslogPrefix = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + ".";
|
|
||||||
Map<String, StorageMetadata> allMetadatas = listAllMetadatas(container, shardTranslogDirectory);
|
|
||||||
for (Map.Entry<String, StorageMetadata> entry : allMetadatas.entrySet()) {
|
|
||||||
if (!entry.getKey().startsWith(currentTranslogPrefix)) {
|
|
||||||
blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (indexDirty) {
|
|
||||||
for (Map.Entry<String, StorageMetadata> entry : allIndicesMetadata.entrySet()) {
|
|
||||||
String blobNameToMatch = entry.getKey();
|
|
||||||
if (blobNameToMatch.contains(".part")) {
|
|
||||||
blobNameToMatch = blobNameToMatch.substring(0, blobNameToMatch.indexOf(".part"));
|
|
||||||
}
|
|
||||||
// remove the directory prefix
|
|
||||||
blobNameToMatch = blobNameToMatch.substring(shardIndexDirectory.length() + 1);
|
|
||||||
boolean found = false;
|
|
||||||
for (final String fileName : snapshotIndexCommit.getFiles()) {
|
|
||||||
if (blobNameToMatch.equals(fileName)) {
|
|
||||||
found = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!found) {
|
|
||||||
blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
|
|
||||||
new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
|
|
||||||
new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime)));
|
|
||||||
}
|
|
||||||
|
|
||||||
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
|
|
||||||
final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(container, shardIndexDirectory);
|
|
||||||
|
|
||||||
// filter out to only have actual files
|
|
||||||
final Map<String, StorageMetadata> filesMetaDatas = Maps.newHashMap();
|
|
||||||
for (Map.Entry<String, StorageMetadata> entry : allMetaDatas.entrySet()) {
|
|
||||||
if (entry.getKey().contains(".part")) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
filesMetaDatas.put(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(filesMetaDatas.size());
|
|
||||||
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
|
|
||||||
final AtomicLong throttlingWaitTime = new AtomicLong();
|
|
||||||
for (final Map.Entry<String, StorageMetadata> entry : filesMetaDatas.entrySet()) {
|
|
||||||
threadPool.execute(new Runnable() {
|
|
||||||
@Override public void run() {
|
|
||||||
try {
|
|
||||||
long throttlingStartTime = System.currentTimeMillis();
|
|
||||||
while (!recoveryThrottler.tryStream(shardId, entry.getKey())) {
|
|
||||||
Thread.sleep(recoveryThrottler.throttleInterval().millis());
|
|
||||||
}
|
|
||||||
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
|
|
||||||
copyToDirectory(entry.getValue(), allMetaDatas);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.debug("Failed to read [" + entry.getKey() + "] into [" + store + "]", e);
|
|
||||||
lastException.set(e);
|
|
||||||
} finally {
|
|
||||||
recoveryThrottler.streamDone(shardId, entry.getKey());
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
latch.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
lastException.set(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
long totalSize = 0;
|
|
||||||
for (Map.Entry<String, StorageMetadata> entry : allMetaDatas.entrySet()) {
|
|
||||||
totalSize += entry.getValue().getSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
long version = -1;
|
|
||||||
try {
|
|
||||||
if (IndexReader.indexExists(store.directory())) {
|
|
||||||
version = IndexReader.getCurrentVersion(store.directory());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new RecoveryStatus.Index(version, filesMetaDatas.size(), new ByteSizeValue(totalSize), 0, new ByteSizeValue(0), TimeValue.timeValueMillis(throttlingWaitTime.get()), TimeValue.timeValueMillis(-1));
|
|
||||||
}
|
|
||||||
|
|
||||||
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
|
|
||||||
final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(container, shardTranslogDirectory);
|
|
||||||
|
|
||||||
long latestTranslogId = -1;
|
|
||||||
for (String name : allMetaDatas.keySet()) {
|
|
||||||
String translogName = name.substring(shardTranslogDirectory.length() + 1);
|
|
||||||
long translogId = Long.parseLong(translogName.substring(0, translogName.lastIndexOf('.')));
|
|
||||||
if (translogId > latestTranslogId) {
|
|
||||||
latestTranslogId = translogId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (latestTranslogId == -1) {
|
|
||||||
// no recovery file found, start the shard and bail
|
|
||||||
indexShard.start();
|
|
||||||
return new RecoveryStatus.Translog(0, TimeValue.timeValueMillis(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
try {
|
|
||||||
ArrayList<Translog.Operation> operations = Lists.newArrayList();
|
|
||||||
|
|
||||||
long size = 0;
|
|
||||||
int index = 1;
|
|
||||||
while (true) {
|
|
||||||
String translogPartName = shardTranslogDirectory + "/" + String.valueOf(latestTranslogId) + "." + index;
|
|
||||||
if (!allMetaDatas.containsKey(translogPartName)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Blob blob = blobStoreContext.getBlobStore().getBlob(container, translogPartName);
|
|
||||||
if (blob == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
size += blob.getContentLength();
|
|
||||||
InputStreamStreamInput streamInput = new InputStreamStreamInput(blob.getContent());
|
|
||||||
int numberOfOperations = streamInput.readInt();
|
|
||||||
for (int i = 0; i < numberOfOperations; i++) {
|
|
||||||
operations.add(readTranslogOperation(streamInput));
|
|
||||||
}
|
|
||||||
index++;
|
|
||||||
}
|
|
||||||
currentTranslogPartToWrite = index;
|
|
||||||
|
|
||||||
indexShard.performRecoveryPrepareForTranslog();
|
|
||||||
indexShard.performRecoveryFinalization(true);
|
|
||||||
|
|
||||||
return new RecoveryStatus.Translog(operations.size(), TimeValue.timeValueMillis(-1));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, StorageMetadata> listAllMetadatas(String container, String directory) {
|
|
||||||
final Map<String, StorageMetadata> allMetaDatas = Maps.newHashMap();
|
|
||||||
|
|
||||||
String nextMarker = null;
|
|
||||||
while (true) {
|
|
||||||
ListContainerOptions options = ListContainerOptions.Builder.inDirectory(directory).maxResults(10000);
|
|
||||||
if (nextMarker != null) {
|
|
||||||
options.afterMarker(nextMarker);
|
|
||||||
}
|
|
||||||
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, options);
|
|
||||||
for (StorageMetadata metadata : pageSet) {
|
|
||||||
allMetaDatas.put(metadata.getName(), metadata);
|
|
||||||
}
|
|
||||||
nextMarker = pageSet.getNextMarker();
|
|
||||||
if (nextMarker == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return allMetaDatas;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteFile(String fileName, Map<String, StorageMetadata> allIndicesMetadata) {
|
|
||||||
// first, check and delete all files with this name
|
|
||||||
for (Map.Entry<String, StorageMetadata> entry : allIndicesMetadata.entrySet()) {
|
|
||||||
String blobName = entry.getKey();
|
|
||||||
if (blobName.contains(".part")) {
|
|
||||||
blobName = blobName.substring(0, blobName.indexOf(".part"));
|
|
||||||
}
|
|
||||||
if (blobName.equals(fileName)) {
|
|
||||||
blobStoreContext.getBlobStore().removeBlob(container, blobName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void copyFromDirectory(Directory dir, String fileName, final CountDownLatch latch, final CopyOnWriteArrayList<Throwable> failures) throws Exception {
|
|
||||||
long totalLength = dir.fileLength(fileName);
|
|
||||||
long numberOfChunks = totalLength / chunkSize.bytes();
|
|
||||||
if (totalLength % chunkSize.bytes() > 0) {
|
|
||||||
numberOfChunks++;
|
|
||||||
}
|
|
||||||
|
|
||||||
final AtomicLong counter = new AtomicLong(numberOfChunks);
|
|
||||||
for (long i = 0; i < numberOfChunks; i++) {
|
|
||||||
final long chunkNumber = i;
|
|
||||||
|
|
||||||
IndexInput indexInput = null;
|
|
||||||
try {
|
|
||||||
indexInput = dir.openInput(fileName);
|
|
||||||
indexInput.seek(chunkNumber * chunkSize.bytes());
|
|
||||||
InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkSize.bytes());
|
|
||||||
|
|
||||||
String blobName = shardIndexDirectory + "/" + fileName;
|
|
||||||
if (chunkNumber > 0) {
|
|
||||||
blobName += ".part" + chunkNumber;
|
|
||||||
}
|
|
||||||
|
|
||||||
Blob blob = blobStoreContext.getBlobStore().newBlob(blobName);
|
|
||||||
blob.setPayload(is);
|
|
||||||
blob.setContentLength(is.actualSizeToRead());
|
|
||||||
|
|
||||||
final IndexInput fIndexInput = indexInput;
|
|
||||||
final ListenableFuture<String> future = blobStoreContext.getAsyncBlobStore().putBlob(container, blob);
|
|
||||||
future.addListener(new Runnable() {
|
|
||||||
@Override public void run() {
|
|
||||||
try {
|
|
||||||
fIndexInput.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!future.isCancelled()) {
|
|
||||||
try {
|
|
||||||
future.get();
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
failures.add(e.getCause());
|
|
||||||
} catch (Exception e) {
|
|
||||||
failures.add(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (counter.decrementAndGet() == 0) {
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, threadPool);
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (indexInput != null) {
|
|
||||||
try {
|
|
||||||
indexInput.close();
|
|
||||||
} catch (IOException e1) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
failures.add(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void copyToDirectory(StorageMetadata metadata, Map<String, StorageMetadata> allMetadatas) throws IOException {
|
|
||||||
String fileName = metadata.getName().substring(shardIndexDirectory.length() + 1);
|
|
||||||
|
|
||||||
Blob blob = blobStoreContext.getBlobStore().getBlob(container, metadata.getName());
|
|
||||||
|
|
||||||
byte[] buffer = new byte[16384];
|
|
||||||
IndexOutput indexOutput = store.directory().createOutput(fileName);
|
|
||||||
|
|
||||||
copy(blob.getContent(), indexOutput, buffer);
|
|
||||||
blob.getContent().close();
|
|
||||||
|
|
||||||
// check the metadatas we have
|
|
||||||
int part = 1;
|
|
||||||
while (true) {
|
|
||||||
String partName = metadata.getName() + ".part" + part;
|
|
||||||
if (!allMetadatas.containsKey(partName)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
blob = blobStoreContext.getBlobStore().getBlob(container, partName);
|
|
||||||
copy(blob.getContent(), indexOutput, buffer);
|
|
||||||
blob.getContent().close();
|
|
||||||
part++;
|
|
||||||
}
|
|
||||||
|
|
||||||
indexOutput.close();
|
|
||||||
|
|
||||||
Directories.sync(store.directory(), fileName);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void copy(InputStream is, IndexOutput indexOutput, byte[] buffer) throws IOException {
|
|
||||||
int len;
|
|
||||||
while ((len = is.read(buffer)) != -1) {
|
|
||||||
indexOutput.writeBytes(buffer, len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class HdfsGateway extends BlobStoreGateway {
|
||||||
|
|
||||||
fileSystem = FileSystem.get(URI.create(uri), conf);
|
fileSystem = FileSystem.get(URI.create(uri), conf);
|
||||||
|
|
||||||
initialize(new HdfsBlobStore(settings, fileSystem, hPath), clusterName);
|
initialize(new HdfsBlobStore(settings, fileSystem, hPath), clusterName, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public String type() {
|
@Override public String type() {
|
||||||
|
|
Loading…
Reference in New Issue