mirror of https://github.com/apache/activemq.git
moved StoreByteArray stream methods to util
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@470391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
166a04bc58
commit
ae687a1182
|
@ -1,299 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UTFDataFormatException;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
/**
|
||||
* Optimized ByteArrayInputStream that can be used more than once
|
||||
*
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public final class StoreByteArrayInputStream extends InputStream implements DataInput{
|
||||
private byte[] buf;
|
||||
private int pos;
|
||||
private int offset;
|
||||
|
||||
/**
|
||||
* Creates a <code>StoreByteArrayInputStream</code>.
|
||||
*
|
||||
* @param buf the input buffer.
|
||||
*/
|
||||
public StoreByteArrayInputStream(byte buf[]){
|
||||
this.buf=buf;
|
||||
this.pos=0;
|
||||
this.offset = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a <code>StoreByteArrayInputStream</code>.
|
||||
*
|
||||
* @param sequence the input buffer.
|
||||
*/
|
||||
public StoreByteArrayInputStream(ByteSequence sequence){
|
||||
this.buf=sequence.getData();
|
||||
this.offset=this.pos=sequence.getOffset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates <code>WireByteArrayInputStream</code> with a minmalist byte array
|
||||
*/
|
||||
public StoreByteArrayInputStream(){
|
||||
this(new byte[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the size
|
||||
*/
|
||||
public int size(){
|
||||
return pos-offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the underlying data array
|
||||
*/
|
||||
public byte[] getRawData(){
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* reset the <code>StoreByteArrayInputStream</code> to use an new byte array
|
||||
*
|
||||
* @param newBuff
|
||||
*/
|
||||
public void restart(byte[] newBuff){
|
||||
buf=newBuff;
|
||||
pos=0;
|
||||
}
|
||||
|
||||
/**
|
||||
* reset the <code>StoreByteArrayInputStream</code> to use an new ByteSequence
|
||||
* @param sequence
|
||||
*
|
||||
*/
|
||||
public void restart(ByteSequence sequence){
|
||||
this.buf=sequence.getData();
|
||||
this.pos=sequence.getOffset();
|
||||
}
|
||||
|
||||
/**
|
||||
* re-start the input stream - reusing the current buffer
|
||||
*
|
||||
* @param size
|
||||
*/
|
||||
public void restart(int size){
|
||||
if(buf==null||buf.length<size){
|
||||
buf=new byte[size];
|
||||
}
|
||||
restart(buf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the next byte of data from this input stream. The value byte is returned as an <code>int</code> in the
|
||||
* range <code>0</code> to <code>255</code>. If no byte is available because the end of the stream has been
|
||||
* reached, the value <code>-1</code> is returned.
|
||||
* <p>
|
||||
* This <code>read</code> method cannot block.
|
||||
*
|
||||
* @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
|
||||
*/
|
||||
public int read(){
|
||||
return (pos<buf.length)?(buf[pos++]&0xff):-1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads up to <code>len</code> bytes of data into an array of bytes from this input stream.
|
||||
*
|
||||
* @param b the buffer into which the data is read.
|
||||
* @param off the start offset of the data.
|
||||
* @param len the maximum number of bytes read.
|
||||
* @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
|
||||
* end of the stream has been reached.
|
||||
*/
|
||||
public int read(byte b[],int off,int len){
|
||||
if(b==null){
|
||||
throw new NullPointerException();
|
||||
}
|
||||
if(pos>=buf.length){
|
||||
return -1;
|
||||
}
|
||||
if(pos+len>buf.length){
|
||||
len=buf.length-pos;
|
||||
}
|
||||
if(len<=0){
|
||||
return 0;
|
||||
}
|
||||
System.arraycopy(buf,pos,b,off,len);
|
||||
pos+=len;
|
||||
return len;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes that can be read from the input stream without blocking.
|
||||
*/
|
||||
public int available(){
|
||||
return buf.length-pos;
|
||||
}
|
||||
|
||||
public void readFully(byte[] b){
|
||||
read(b,0,b.length);
|
||||
}
|
||||
|
||||
public void readFully(byte[] b,int off,int len){
|
||||
read(b,off,len);
|
||||
}
|
||||
|
||||
public int skipBytes(int n){
|
||||
if(pos+n>buf.length){
|
||||
n=buf.length-pos;
|
||||
}
|
||||
if(n<0){
|
||||
return 0;
|
||||
}
|
||||
pos+=n;
|
||||
return n;
|
||||
}
|
||||
|
||||
public boolean readBoolean(){
|
||||
return read()!=0;
|
||||
}
|
||||
|
||||
public byte readByte(){
|
||||
return (byte) read();
|
||||
}
|
||||
|
||||
public int readUnsignedByte(){
|
||||
return read();
|
||||
}
|
||||
|
||||
public short readShort(){
|
||||
int ch1=read();
|
||||
int ch2=read();
|
||||
return (short) ((ch1<<8)+(ch2<<0));
|
||||
}
|
||||
|
||||
public int readUnsignedShort(){
|
||||
int ch1=read();
|
||||
int ch2=read();
|
||||
return ((ch1<<8)+(ch2<<0));
|
||||
}
|
||||
|
||||
public char readChar(){
|
||||
int ch1=read();
|
||||
int ch2=read();
|
||||
return (char) ((ch1<<8)+(ch2<<0));
|
||||
}
|
||||
|
||||
public int readInt(){
|
||||
int ch1=read();
|
||||
int ch2=read();
|
||||
int ch3=read();
|
||||
int ch4=read();
|
||||
return ((ch1<<24)+(ch2<<16)+(ch3<<8)+(ch4<<0));
|
||||
}
|
||||
|
||||
public long readLong(){
|
||||
return (((long) buf[pos++]<<56)+((long) (buf[pos++]&255)<<48)+((long) (buf[pos++]&255)<<40)
|
||||
+((long) (buf[pos++]&255)<<32)+((long) (buf[pos++]&255)<<24)+((buf[pos++]&255)<<16)
|
||||
+((buf[pos++]&255)<<8)+((buf[pos++]&255)<<0));
|
||||
}
|
||||
|
||||
public float readFloat() throws IOException{
|
||||
return Float.intBitsToFloat(readInt());
|
||||
}
|
||||
|
||||
public double readDouble() throws IOException{
|
||||
return Double.longBitsToDouble(readLong());
|
||||
}
|
||||
|
||||
public String readLine(){
|
||||
int start=pos;
|
||||
while(pos<buf.length){
|
||||
int c=read();
|
||||
if(c=='\n'){
|
||||
break;
|
||||
}
|
||||
if(c=='\r'){
|
||||
c=read();
|
||||
if(c!='\n'&&c!=-1){
|
||||
pos--;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return new String(buf,start,pos);
|
||||
}
|
||||
|
||||
public String readUTF() throws IOException{
|
||||
int length=readUnsignedShort();
|
||||
char[] characters=new char[length];
|
||||
int c,c2,c3;
|
||||
int count=0;
|
||||
int total=pos+length;
|
||||
while(pos<total){
|
||||
c=(int) buf[pos]&0xff;
|
||||
if(c>127)
|
||||
break;
|
||||
pos++;
|
||||
characters[count++]=(char) c;
|
||||
}
|
||||
while(pos<total){
|
||||
c=(int) buf[pos]&0xff;
|
||||
switch(c>>4){
|
||||
case 0:
|
||||
case 1:
|
||||
case 2:
|
||||
case 3:
|
||||
case 4:
|
||||
case 5:
|
||||
case 6:
|
||||
case 7:
|
||||
pos++;
|
||||
characters[count++]=(char) c;
|
||||
break;
|
||||
case 12:
|
||||
case 13:
|
||||
pos+=2;
|
||||
if(pos>length)
|
||||
throw new UTFDataFormatException("bad string");
|
||||
c2=(int) buf[pos-1];
|
||||
if((c2&0xC0)!=0x80)
|
||||
throw new UTFDataFormatException("bad string");
|
||||
characters[count++]=(char) (((c&0x1F)<<6)|(c2&0x3F));
|
||||
break;
|
||||
case 14:
|
||||
pos+=3;
|
||||
if(pos>length)
|
||||
throw new UTFDataFormatException("bad string");
|
||||
c2=(int) buf[pos-2];
|
||||
c3=(int) buf[pos-1];
|
||||
if(((c2&0xC0)!=0x80)||((c3&0xC0)!=0x80))
|
||||
throw new UTFDataFormatException("bad string");
|
||||
characters[count++]=(char) (((c&0x0F)<<12)|((c2&0x3F)<<6)|((c3&0x3F)<<0));
|
||||
break;
|
||||
default:
|
||||
throw new UTFDataFormatException("bad string");
|
||||
}
|
||||
}
|
||||
return new String(characters,0,count);
|
||||
}
|
||||
}
|
|
@ -1,245 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha.impl.data;
|
||||
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UTFDataFormatException;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
/**
|
||||
* Optimized ByteArrayOutputStream
|
||||
*
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public final class StoreByteArrayOutputStream extends OutputStream implements DataOutput{
|
||||
private byte buf[];
|
||||
private int pos;
|
||||
|
||||
/**
|
||||
* Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes.
|
||||
*
|
||||
* @param size the initial size.
|
||||
* @exception IllegalArgumentException if size is negative.
|
||||
*/
|
||||
public StoreByteArrayOutputStream(int size){
|
||||
if(size<0){
|
||||
throw new IllegalArgumentException("Invalid size: "+size);
|
||||
}
|
||||
buf=new byte[size];
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new byte array output stream.
|
||||
*/
|
||||
public StoreByteArrayOutputStream(){
|
||||
this(16*1024);
|
||||
}
|
||||
|
||||
/**
|
||||
* start using a fresh byte array
|
||||
*
|
||||
* @param size
|
||||
*/
|
||||
public void restart(int size){
|
||||
buf=new byte[size];
|
||||
pos=0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a ByteSequence from the stream
|
||||
* @return the byte sequence
|
||||
*/
|
||||
public ByteSequence toByteSequence() {
|
||||
return new ByteSequence(buf, 0, pos);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the specified byte to this byte array output stream.
|
||||
*
|
||||
* @param b the byte to be written.
|
||||
*/
|
||||
public void write(int b){
|
||||
int newcount=pos+1;
|
||||
ensureEnoughBuffer(newcount);
|
||||
buf[pos]=(byte) b;
|
||||
pos=newcount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to this byte
|
||||
* array output stream.
|
||||
*
|
||||
* @param b the data.
|
||||
* @param off the start offset in the data.
|
||||
* @param len the number of bytes to write.
|
||||
*/
|
||||
public void write(byte b[],int off,int len){
|
||||
if(len==0){
|
||||
return;
|
||||
}
|
||||
int newcount=pos+len;
|
||||
ensureEnoughBuffer(newcount);
|
||||
System.arraycopy(b,off,buf,pos,len);
|
||||
pos=newcount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the underlying byte[] buffer
|
||||
*/
|
||||
public byte[] getData(){
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* reset the output stream
|
||||
*/
|
||||
public void reset(){
|
||||
pos=0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the current position for writing
|
||||
*
|
||||
* @param offset
|
||||
*/
|
||||
public void position(int offset){
|
||||
ensureEnoughBuffer(offset);
|
||||
pos=offset;
|
||||
}
|
||||
|
||||
public int size(){
|
||||
return pos;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void writeBoolean(boolean v){
|
||||
ensureEnoughBuffer(1);
|
||||
buf[pos++]=(byte) (v?1:0);
|
||||
}
|
||||
|
||||
public void writeByte(int v){
|
||||
ensureEnoughBuffer(1);
|
||||
buf[pos++]=(byte) (v>>>0);
|
||||
}
|
||||
|
||||
public void writeShort(int v){
|
||||
ensureEnoughBuffer(2);
|
||||
buf[pos++]=(byte) (v>>>8);
|
||||
buf[pos++]=(byte) (v>>>0);
|
||||
}
|
||||
|
||||
public void writeChar(int v){
|
||||
ensureEnoughBuffer(2);
|
||||
buf[pos++]=(byte) (v>>>8);
|
||||
buf[pos++]=(byte) (v>>>0);
|
||||
}
|
||||
|
||||
public void writeInt(int v){
|
||||
ensureEnoughBuffer(4);
|
||||
buf[pos++]=(byte) (v>>>24);
|
||||
buf[pos++]=(byte) (v>>>16);
|
||||
buf[pos++]=(byte) (v>>>8);
|
||||
buf[pos++]=(byte) (v>>>0);
|
||||
}
|
||||
|
||||
public void writeLong(long v){
|
||||
ensureEnoughBuffer(8);
|
||||
buf[pos++]=(byte) (v>>>56);
|
||||
buf[pos++]=(byte) (v>>>48);
|
||||
buf[pos++]=(byte) (v>>>40);
|
||||
buf[pos++]=(byte) (v>>>32);
|
||||
buf[pos++]=(byte) (v>>>24);
|
||||
buf[pos++]=(byte) (v>>>16);
|
||||
buf[pos++]=(byte) (v>>>8);
|
||||
buf[pos++]=(byte) (v>>>0);
|
||||
}
|
||||
|
||||
public void writeFloat(float v) throws IOException{
|
||||
writeInt(Float.floatToIntBits(v));
|
||||
}
|
||||
|
||||
public void writeDouble(double v) throws IOException{
|
||||
writeLong(Double.doubleToLongBits(v));
|
||||
}
|
||||
|
||||
public void writeBytes(String s){
|
||||
int length=s.length();
|
||||
for(int i=0;i<length;i++){
|
||||
write((byte) s.charAt(i));
|
||||
}
|
||||
}
|
||||
|
||||
public void writeChars(String s){
|
||||
int length=s.length();
|
||||
for(int i=0;i<length;i++){
|
||||
int c=s.charAt(i);
|
||||
write((c>>>8)&0xFF);
|
||||
write((c>>>0)&0xFF);
|
||||
}
|
||||
}
|
||||
|
||||
public void writeUTF(String str) throws IOException{
|
||||
int strlen=str.length();
|
||||
int encodedsize=0;
|
||||
int c;
|
||||
for(int i=0;i<strlen;i++){
|
||||
c=str.charAt(i);
|
||||
if((c>=0x0001)&&(c<=0x007F)){
|
||||
encodedsize++;
|
||||
}else if(c>0x07FF){
|
||||
encodedsize+=3;
|
||||
}else{
|
||||
encodedsize+=2;
|
||||
}
|
||||
}
|
||||
if(encodedsize>65535)
|
||||
throw new UTFDataFormatException("encoded string too long: "+encodedsize+" bytes");
|
||||
ensureEnoughBuffer(encodedsize+2);
|
||||
writeShort(encodedsize);
|
||||
int i=0;
|
||||
for(i=0;i<strlen;i++){
|
||||
c=str.charAt(i);
|
||||
if(!((c>=0x0001)&&(c<=0x007F)))
|
||||
break;
|
||||
buf[pos++]=(byte) c;
|
||||
}
|
||||
for(;i<strlen;i++){
|
||||
c=str.charAt(i);
|
||||
if((c>=0x0001)&&(c<=0x007F)){
|
||||
buf[pos++]=(byte) c;
|
||||
}else if(c>0x07FF){
|
||||
buf[pos++]=(byte) (0xE0|((c>>12)&0x0F));
|
||||
buf[pos++]=(byte) (0x80|((c>>6)&0x3F));
|
||||
buf[pos++]=(byte) (0x80|((c>>0)&0x3F));
|
||||
}else{
|
||||
buf[pos++]=(byte) (0xC0|((c>>6)&0x1F));
|
||||
buf[pos++]=(byte) (0x80|((c>>0)&0x3F));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureEnoughBuffer(int newcount){
|
||||
if(newcount>buf.length){
|
||||
byte newbuf[]=new byte[Math.max(buf.length<<1,newcount)];
|
||||
System.arraycopy(buf,0,newbuf,0,pos);
|
||||
buf=newbuf;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.io.RandomAccessFile;
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.kaha.StoreLocation;
|
||||
import org.apache.activemq.util.DataByteArrayInputStream;
|
||||
/**
|
||||
* Optimized Store reader
|
||||
*
|
||||
|
@ -29,7 +30,7 @@ import org.apache.activemq.kaha.StoreLocation;
|
|||
final class StoreDataReader{
|
||||
|
||||
private DataManager dataManager;
|
||||
private StoreByteArrayInputStream dataIn;
|
||||
private DataByteArrayInputStream dataIn;
|
||||
|
||||
/**
|
||||
* Construct a Store reader
|
||||
|
@ -38,7 +39,7 @@ final class StoreDataReader{
|
|||
*/
|
||||
StoreDataReader(DataManager fileManager){
|
||||
this.dataManager=fileManager;
|
||||
this.dataIn=new StoreByteArrayInputStream();
|
||||
this.dataIn=new DataByteArrayInputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.RandomAccessFile;
|
|||
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.kaha.StoreLocation;
|
||||
import org.apache.activemq.util.DataByteArrayOutputStream;
|
||||
/**
|
||||
* Optimized Store writer
|
||||
*
|
||||
|
@ -30,7 +31,7 @@ import org.apache.activemq.kaha.StoreLocation;
|
|||
*/
|
||||
final class StoreDataWriter{
|
||||
|
||||
private StoreByteArrayOutputStream buffer;
|
||||
private DataByteArrayOutputStream buffer;
|
||||
private DataManager dataManager;
|
||||
|
||||
|
||||
|
@ -41,7 +42,7 @@ final class StoreDataWriter{
|
|||
*/
|
||||
StoreDataWriter(DataManager fileManager){
|
||||
this.dataManager=fileManager;
|
||||
this.buffer=new StoreByteArrayOutputStream();
|
||||
this.buffer=new DataByteArrayOutputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.activemq.kaha.impl.index;
|
|||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.activemq.kaha.impl.data.StoreByteArrayInputStream;
|
||||
import org.apache.activemq.util.DataByteArrayInputStream;
|
||||
/**
|
||||
* Optimized Store reader
|
||||
*
|
||||
|
@ -28,7 +28,7 @@ import org.apache.activemq.kaha.impl.data.StoreByteArrayInputStream;
|
|||
*/
|
||||
class StoreIndexReader{
|
||||
protected RandomAccessFile file;
|
||||
protected StoreByteArrayInputStream dataIn;
|
||||
protected DataByteArrayInputStream dataIn;
|
||||
protected byte[] buffer=new byte[IndexItem.INDEX_SIZE];
|
||||
|
||||
/**
|
||||
|
@ -38,7 +38,7 @@ class StoreIndexReader{
|
|||
*/
|
||||
StoreIndexReader(RandomAccessFile file){
|
||||
this.file=file;
|
||||
this.dataIn=new StoreByteArrayInputStream();
|
||||
this.dataIn=new DataByteArrayInputStream();
|
||||
}
|
||||
|
||||
protected IndexItem readItem(long offset) throws IOException{
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io.IOException;
|
|||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.activemq.kaha.impl.data.DataManager;
|
||||
import org.apache.activemq.kaha.impl.data.StoreByteArrayOutputStream;
|
||||
import org.apache.activemq.util.DataByteArrayOutputStream;
|
||||
/**
|
||||
* Optimized Store writer
|
||||
*
|
||||
|
@ -29,7 +29,7 @@ import org.apache.activemq.kaha.impl.data.StoreByteArrayOutputStream;
|
|||
*/
|
||||
class StoreIndexWriter{
|
||||
|
||||
protected final StoreByteArrayOutputStream dataOut = new StoreByteArrayOutputStream();
|
||||
protected final DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream();
|
||||
protected final RandomAccessFile file;
|
||||
protected final String name;
|
||||
protected final DataManager redoLog;
|
||||
|
|
Loading…
Reference in New Issue