allow to control the buffer size of the gateway blob container
This commit is contained in:
parent
a7c13826da
commit
3770924300
|
@ -22,6 +22,7 @@ package org.elasticsearch.common.blobstore.fs;
|
|||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -51,7 +52,7 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
|
|||
}
|
||||
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
|
||||
for (File file : files) {
|
||||
builder.put(file.getName(), new FsBlobMetaData(file));
|
||||
builder.put(file.getName(), new PlainBlobMetaData(file.getName(), file.length()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
@ -63,7 +64,7 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
|
|||
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
|
||||
blobStore.executorService().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
byte[] buffer = new byte[1024 * 16];
|
||||
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
|
||||
FileInputStream is;
|
||||
try {
|
||||
is = new FileInputStream(new File(path, blobName));
|
||||
|
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsBlobMetaData implements BlobMetaData {
|
||||
|
||||
private final File file;
|
||||
|
||||
public FsBlobMetaData(File file) {
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return file.getName();
|
||||
}
|
||||
|
||||
@Override public long sizeInBytes() {
|
||||
return file.length();
|
||||
}
|
||||
}
|
|
@ -22,6 +22,8 @@ package org.elasticsearch.common.blobstore.fs;
|
|||
import org.elasticsearch.common.blobstore.*;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -39,6 +41,8 @@ public class FsBlobStore implements BlobStore {
|
|||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
private final int bufferSizeInBytes;
|
||||
|
||||
public FsBlobStore(Settings settings, File path) {
|
||||
this.path = path;
|
||||
if (!path.exists()) {
|
||||
|
@ -50,7 +54,8 @@ public class FsBlobStore implements BlobStore {
|
|||
if (!path.isDirectory()) {
|
||||
throw new BlobStoreException("Path is not a directory at [" + path + "]");
|
||||
}
|
||||
executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "fs_blobstore"));
|
||||
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
|
||||
this.executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "fs_blobstore"));
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
|
@ -61,6 +66,10 @@ public class FsBlobStore implements BlobStore {
|
|||
return path;
|
||||
}
|
||||
|
||||
public int bufferSizeInBytes() {
|
||||
return this.bufferSizeInBytes;
|
||||
}
|
||||
|
||||
public ExecutorService executorService() {
|
||||
return executorService;
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
|
|||
}
|
||||
try {
|
||||
try {
|
||||
byte[] buffer = new byte[16 * 1024];
|
||||
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
|
||||
int bytesRead;
|
||||
while ((bytesRead = is.read(buffer)) != -1) {
|
||||
raf.write(buffer, 0, bytesRead);
|
||||
|
|
|
@ -81,7 +81,7 @@ public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer {
|
|||
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
|
||||
blobStore.executorService().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
byte[] buffer = new byte[1024 * 16];
|
||||
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
|
||||
|
||||
|
||||
FSDataInputStream fileStream;
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.elasticsearch.common.blobstore.BlobPath;
|
|||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -42,6 +44,8 @@ public class HdfsBlobStore implements BlobStore {
|
|||
|
||||
private final Path path;
|
||||
|
||||
private final int bufferSizeInBytes;
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public HdfsBlobStore(Settings settings, FileSystem fileSystem, Path path) throws IOException {
|
||||
|
@ -52,6 +56,7 @@ public class HdfsBlobStore implements BlobStore {
|
|||
fileSystem.mkdirs(path);
|
||||
}
|
||||
|
||||
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
|
||||
executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "hdfs_blobstore"));
|
||||
}
|
||||
|
||||
|
@ -59,6 +64,10 @@ public class HdfsBlobStore implements BlobStore {
|
|||
return path.toString();
|
||||
}
|
||||
|
||||
public int bufferSizeInBytes() {
|
||||
return this.bufferSizeInBytes;
|
||||
}
|
||||
|
||||
public FileSystem fileSystem() {
|
||||
return fileSystem;
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ public class HdfsImmutableBlobContainer extends AbstractHdfsBlobContainer implem
|
|||
}
|
||||
try {
|
||||
try {
|
||||
byte[] buffer = new byte[16 * 1024];
|
||||
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
|
||||
int bytesRead;
|
||||
while ((bytesRead = is.read(buffer)) != -1) {
|
||||
fileStream.write(buffer, 0, bytesRead);
|
||||
|
|
Loading…
Reference in New Issue