Cloud Plugin: Cloud gateway default chunk size change to 1g, closes #186.

This commit is contained in:
kimchy 2010-05-22 01:08:43 +03:00
parent fb03652ecd
commit ecc74f225e
5 changed files with 129 additions and 43 deletions

View File

@ -43,13 +43,17 @@
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-scriptbuilder/jars/jclouds-scriptbuilder-1.0-beta-5.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-vcloud/jars/jclouds-vcloud-1.0-beta-5.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-rackspace/jars/jclouds-rackspace-1.0-beta-5.jar!/" />
<root url="file://$GRADLE_REPOSITORY$/org.jboss.resteasy/resteasy-jaxrs-all" />
<root url="jar://$GRADLE_REPOSITORY$/org.jboss.resteasy/resteasy-jaxrs-client/jars/resteasy-jaxrs-client-1.2.1.GA.jar!/" />
</CLASSES>
<JAVADOC>
<root url="http://jclouds.rimuhosting.com/apidocs/" />
</JAVADOC>
<SOURCES />
<SOURCES>
<root url="jar://$MODULE_DIR$/../../../../../opt/jclouds/1.0-beta-5/core/src/jclouds-compute-1.0-beta-5-sources.jar!/" />
<root url="jar://$MODULE_DIR$/../../../../../opt/jclouds/1.0-beta-5/core/src/jclouds-core-1.0-beta-5-sources.jar!/" />
<root url="jar://$MODULE_DIR$/../../../../../opt/jclouds/1.0-beta-5/core/src/jclouds-blobstore-1.0-beta-5-sources.jar!/" />
<root url="jar://$MODULE_DIR$/../../../../../opt/jclouds/1.0-beta-5/providers/aws/src/jclouds-aws-1.0-beta-5-sources.jar!/" />
</SOURCES>
</library>
</orderEntry>
</component>

View File

@ -26,7 +26,7 @@ import java.util.concurrent.*;
/**
* @author kimchy (shay.banon)
*/
public interface ThreadPool {
public interface ThreadPool extends Executor {
ThreadPoolInfo info();

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.lucene.store;
import org.apache.lucene.store.IndexInput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class ThreadSafeInputStreamIndexInput extends InputStreamIndexInput {
public ThreadSafeInputStreamIndexInput(IndexInput indexInput, long limit) {
super(indexInput, limit);
}
@Override public synchronized int read(byte[] b, int off, int len) throws IOException {
return super.read(b, off, len);
}
}

View File

@ -76,7 +76,7 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
}
if (chunkSize == null) {
chunkSize = new SizeValue(4, SizeUnit.GB);
chunkSize = new SizeValue(1, SizeUnit.GB);
}
if (location == null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.gateway.cloud;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
@ -48,6 +49,7 @@ import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.InputStreamStreamInput;
import org.elasticsearch.util.lucene.Directories;
import org.elasticsearch.util.lucene.store.InputStreamIndexInput;
import org.elasticsearch.util.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.util.settings.Settings;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
@ -60,7 +62,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -165,10 +169,9 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
long time = System.currentTimeMillis();
indexDirty = true;
allIndicesMetadata = listAllMetadatas(container, shardIndexDirectory);
final Map<String, StorageMetadata> allIndicesMetadataF = allIndicesMetadata;
// snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (final String fileName : snapshotIndexCommit.getFiles()) {
// don't copy over the segments file, it will be copied over later on as part of the
// final snapshot phase
@ -179,8 +182,21 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
IndexInput indexInput = null;
try {
indexInput = snapshotIndexCommit.getDirectory().openInput(fileName);
StorageMetadata metadata = allIndicesMetadata.get(fileName);
if (metadata != null && (metadata.getSize() == indexInput.length())) {
long totalLength = 0;
int counter = 0;
while (true) {
String blobName = shardIndexDirectory + "/" + fileName;
if (counter > 0) {
blobName = blobName + ".part" + counter;
}
StorageMetadata metadata = allIndicesMetadata.get(blobName);
if (metadata == null) {
break;
}
totalLength += metadata.getSize();
counter++;
}
if (totalLength == indexInput.length()) {
// we assume its the same one, no need to copy
latch.countDown();
continue;
@ -203,25 +219,20 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
// ignore...
}
deleteFile(shardIndexDirectory + "/" + fileName, allIndicesMetadata);
threadPool.execute(new Runnable() {
@Override public void run() {
try {
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, allIndicesMetadataF);
} catch (Exception e) {
lastException.set(e);
} finally {
latch.countDown();
}
}
});
try {
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, latch, failures);
} catch (Exception e) {
failures.add(e);
latch.countDown();
}
}
try {
latch.await();
} catch (InterruptedException e) {
lastException.set(e);
failures.add(e);
}
if (lastException.get() != null) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
if (!failures.isEmpty()) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
}
indexTime = System.currentTimeMillis() - time;
}
@ -487,33 +498,66 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
}
private void copyFromDirectory(Directory dir, String fileName, Map<String, StorageMetadata> allIndicesMetadata) throws IOException {
IndexInput indexInput = dir.openInput(fileName);
private void copyFromDirectory(Directory dir, String fileName, final CountDownLatch latch, final CopyOnWriteArrayList<Throwable> failures) throws Exception {
long totalLength = dir.fileLength(fileName);
long numberOfChunks = totalLength / chunkSize.bytes();
if (totalLength % chunkSize.bytes() > 0) {
numberOfChunks++;
}
try {
Blob blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + fileName);
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(container, blob);
final AtomicLong counter = new AtomicLong(numberOfChunks);
for (long i = 0; i < numberOfChunks; i++) {
final long chunkNumber = i;
int part = 1;
while (indexInput.getFilePointer() < indexInput.length()) {
is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
if (is.actualSizeToRead() <= 0) {
break;
IndexInput indexInput = null;
try {
indexInput = dir.openInput(fileName);
indexInput.seek(chunkNumber * chunkSize.bytes());
InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkSize.bytes());
String blobName = shardIndexDirectory + "/" + fileName;
if (chunkNumber > 0) {
blobName += ".part" + chunkNumber;
}
blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + fileName + ".part" + part);
Blob blob = blobStoreContext.getBlobStore().newBlob(blobName);
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(container, blob);
part++;
}
} finally {
try {
indexInput.close();
final IndexInput fIndexInput = indexInput;
final ListenableFuture<String> future = blobStoreContext.getAsyncBlobStore().putBlob(container, blob);
future.addListener(new Runnable() {
@Override public void run() {
try {
fIndexInput.close();
} catch (IOException e) {
// ignore
}
if (!future.isCancelled()) {
try {
future.get();
} catch (ExecutionException e) {
failures.add(e.getCause());
} catch (Exception e) {
failures.add(e);
}
}
if (counter.decrementAndGet() == 0) {
latch.countDown();
}
}
}, threadPool);
} catch (Exception e) {
// ignore
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e1) {
// ignore
}
}
failures.add(e);
}
}
}