HBASE-1155 Verify that FSDataoutputStream.sync() works
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/branches/trunk_on_hadoop-0.19.1-dev_with_hadoop-4379@743191 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e527c8794e
commit
69bbf6b099
Binary file not shown.
Binary file not shown.
|
@ -770,11 +770,13 @@ public class SequenceFile {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
assert false : "hashCode not designed";
|
||||
return 42; // any arbitrary constant will do
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("size: ").append(this.theMetadata.size()).append("\n");
|
||||
|
@ -952,6 +954,13 @@ public class SequenceFile {
|
|||
}
|
||||
}
|
||||
|
||||
/** flush all currently written data to the file system */
|
||||
public void syncFs() throws IOException {
|
||||
if (out != null) {
|
||||
out.sync(); // flush contents to file system
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns the configuration of this file. */
|
||||
Configuration getConf() { return conf; }
|
||||
|
||||
|
@ -1115,10 +1124,13 @@ public class SequenceFile {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isCompressed() { return true; }
|
||||
@Override
|
||||
boolean isBlockCompressed() { return false; }
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void append(Object key, Object val)
|
||||
throws IOException {
|
||||
|
@ -1151,6 +1163,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
public synchronized void appendRaw(byte[] keyData, int keyOffset,
|
||||
int keyLength, ValueBytes val) throws IOException {
|
||||
|
||||
|
@ -1240,7 +1253,9 @@ public class SequenceFile {
|
|||
finalizeFileHeader();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isCompressed() { return true; }
|
||||
@Override
|
||||
boolean isBlockCompressed() { return true; }
|
||||
|
||||
/** Initialize */
|
||||
|
@ -1268,6 +1283,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Compress and flush contents to dfs */
|
||||
@Override
|
||||
public synchronized void sync() throws IOException {
|
||||
if (noBufferedRecords > 0) {
|
||||
super.sync();
|
||||
|
@ -1297,6 +1313,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Close the file. */
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (out != null) {
|
||||
sync();
|
||||
|
@ -1305,6 +1322,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void append(Object key, Object val)
|
||||
throws IOException {
|
||||
|
@ -1337,6 +1355,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Append a key/value pair. */
|
||||
@Override
|
||||
public synchronized void appendRaw(byte[] keyData, int keyOffset,
|
||||
int keyLength, ValueBytes val) throws IOException {
|
||||
|
||||
|
@ -1929,6 +1948,7 @@ public class SequenceFile {
|
|||
* of the value may be computed by calling buffer.getLength() before and
|
||||
* after calls to this method. */
|
||||
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
|
||||
@Deprecated
|
||||
public synchronized int next(DataOutputBuffer buffer) throws IOException {
|
||||
// Unsupported for block-compressed sequence files
|
||||
if (blockCompressed) {
|
||||
|
@ -2210,6 +2230,7 @@ public class SequenceFile {
|
|||
}
|
||||
|
||||
/** Returns the name of the file. */
|
||||
@Override
|
||||
public String toString() {
|
||||
return file.toString();
|
||||
}
|
||||
|
@ -2798,6 +2819,7 @@ public class SequenceFile {
|
|||
this.tmpDir = tmpDir;
|
||||
this.progress = progress;
|
||||
}
|
||||
@Override
|
||||
protected boolean lessThan(Object a, Object b) {
|
||||
// indicate we're making progress
|
||||
if (progress != null) {
|
||||
|
@ -2933,7 +2955,7 @@ public class SequenceFile {
|
|||
totalBytes += segmentsToMerge.get(i).segmentLength;
|
||||
}
|
||||
if (totalBytes != 0) //being paranoid
|
||||
progPerByte = 1.0f / (float)totalBytes;
|
||||
progPerByte = 1.0f / totalBytes;
|
||||
//reset factor to what it originally was
|
||||
factor = origFactor;
|
||||
return this;
|
||||
|
@ -3055,6 +3077,7 @@ public class SequenceFile {
|
|||
compareTo(that.segmentPathName.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof SegmentDescriptor)) {
|
||||
return false;
|
||||
|
@ -3069,6 +3092,7 @@ public class SequenceFile {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
|
||||
}
|
||||
|
@ -3159,6 +3183,7 @@ public class SequenceFile {
|
|||
/** The default cleanup. Subclasses can override this with a custom
|
||||
* cleanup
|
||||
*/
|
||||
@Override
|
||||
public void cleanup() throws IOException {
|
||||
super.close();
|
||||
if (super.shouldPreserveInput()) return;
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.EOFException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile;
|
||||
|
||||
/** Tries to read the file created by Writer */
|
||||
public class Reader {
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (args.length < 1 || args.length > 2) {
|
||||
System.err.println("usage: Reader expected-number-of-records [ -n ]");
|
||||
System.err.println(" where -n = do not try to recover lease");
|
||||
return;
|
||||
}
|
||||
int expected = Integer.valueOf(args[0]);
|
||||
boolean recover = true;
|
||||
if (args.length == 2 && args[1].compareTo("-n") == 0) {
|
||||
recover = false;
|
||||
}
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Path dir = new Path(conf.get("fs.default.name"), "log");
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
throw new IOException("Wrong file system: " + fs.getClass().getName());
|
||||
}
|
||||
|
||||
if (recover) {
|
||||
waitForLeaseRecovery(fs, new Path(dir, "log"));
|
||||
}
|
||||
|
||||
SequenceFile.Reader in = null;
|
||||
try {
|
||||
in = new SequenceFile.Reader(fs, new Path(dir, "log"), conf);
|
||||
} catch (EOFException e) {
|
||||
if (expected != 0) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
IntWritable key = new IntWritable();
|
||||
BytesWritable value = new BytesWritable();
|
||||
int count = 0;
|
||||
IOException ex = null;
|
||||
try {
|
||||
while (in.next(key, value)) {
|
||||
count++;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
ex = e;
|
||||
}
|
||||
if (expected != count) {
|
||||
System.err.println("Read " + count + " lines, expected " + expected +
|
||||
" lines");
|
||||
}
|
||||
in.close();
|
||||
if (ex != null) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
static void waitForLeaseRecovery(FileSystem fs, Path file) {
|
||||
boolean done = false;
|
||||
while (!done) {
|
||||
try {
|
||||
Thread.sleep(10*1000);
|
||||
} catch (InterruptedException e) {
|
||||
System.out.println("Sleep interrupted.");
|
||||
}
|
||||
try {
|
||||
FSDataOutputStream out = fs.append(file);
|
||||
out.close();
|
||||
done = true;
|
||||
} catch (IOException e) {
|
||||
System.out.println("Triggering lease recovery if needed.");
|
||||
}
|
||||
}
|
||||
System.out.println("Lease Recovery Successful");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.hadoop.hbase.io.SequenceFile;
|
||||
|
||||
/** Writes to a Sequence file and then commits suicide */
|
||||
public class Writer {
|
||||
private static byte[] bytes = new byte[1020];
|
||||
private static BytesWritable value;
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (args.length != 3) {
|
||||
System.err.println("usage: Writer total-writes writes-per-sync block-size-mb");
|
||||
return;
|
||||
}
|
||||
long blocksize = Long.valueOf(args[2]);
|
||||
if (blocksize != 1L && blocksize != 64L) {
|
||||
System.err.println("Only 1MB and 64MB blocksizes are allowed");
|
||||
return;
|
||||
}
|
||||
blocksize *= 1024L * 1024L;
|
||||
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
bytes[i] = (byte)(i % 64);
|
||||
}
|
||||
value = new BytesWritable(bytes);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Path dir = new Path(conf.get("fs.default.name"), "log");
|
||||
conf.set("fs.default.name", dir.toString());
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
throw new IOException("Wrong file system: " + fs.getClass().getName());
|
||||
}
|
||||
|
||||
fs.mkdirs(dir);
|
||||
Path file = new Path(dir, "log");
|
||||
if (fs.exists(file)) {
|
||||
fs.delete(file, false);
|
||||
}
|
||||
|
||||
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file,
|
||||
IntWritable.class, BytesWritable.class,
|
||||
fs.getConf().getInt("io.file.buffer.size", 4096),
|
||||
fs.getDefaultReplication(), blocksize,
|
||||
SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
|
||||
new SequenceFile.Metadata());
|
||||
|
||||
int totalWrites = Integer.valueOf(args[0]);
|
||||
int writesPerSync = Integer.valueOf(args[1]);
|
||||
for (int i = 1; i <= totalWrites; i++) {
|
||||
writer.append(new IntWritable(i), value);
|
||||
if (i % writesPerSync == 0) {
|
||||
writer.syncFs();
|
||||
}
|
||||
}
|
||||
|
||||
// The following *should* prevent hdfs shutdown hook from running
|
||||
Runtime.getRuntime().halt(-1);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue