Changeset 29484 in osm for applications/editors/josm/plugins/imagerycache/src/org/mapdb/Volume.java
- Timestamp:
- 2013-04-07T17:07:27+02:00 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
applications/editors/josm/plugins/imagerycache/src/org/mapdb/Volume.java
r29363 r29484 24 24 import java.nio.ByteBuffer; 25 25 import java.nio.MappedByteBuffer; 26 import java.nio.channels.AsynchronousFileChannel;27 26 import java.nio.channels.FileChannel; 28 import java.nio.file.StandardOpenOption;29 27 import java.util.Arrays; 28 import java.util.Map; 29 import java.util.WeakHashMap; 30 30 import java.util.concurrent.ExecutionException; 31 31 import java.util.concurrent.Future; … … 34 34 35 35 /** 36 * MapDB abstraction over raw storage (file, disk partition, memory etc...) 36 * MapDB abstraction over raw storage (file, disk partition, memory etc...). 37 * <p/> 38 * Implementations needs to be thread safe (especially 39 'ensureAvailable') operation. 40 * However updates do not have to be atomic, it is clients responsibility 41 * to ensure two threads are not writing/reading into the same location. 37 42 * 38 43 * @author Jan Kotek … … 41 46 42 47 public static final int BUF_SIZE = 1<<30; 43 public static final int INITIAL_SIZE = 1024*32;44 48 45 49 abstract public void ensureAvailable(final long offset); … … 49 53 abstract public void putByte(final long offset, final byte value); 50 54 51 abstract public void putData(final long offset, final byte[] value, int size);55 abstract public void putData(final long offset, final byte[] src, int srcPos, int srcSize); 52 56 abstract public void putData(final long offset, final ByteBuffer buf); 53 57 … … 88 92 putByte(offset, (byte)(b & 0xff)); 89 93 } 94 95 /** 96 * Reads a long from the indicated position 97 */ 98 public final long getSixLong(long pos) { 99 return 100 ((long) (getByte(pos + 0) & 0xff) << 40) | 101 ((long) (getByte(pos + 1) & 0xff) << 32) | 102 ((long) (getByte(pos + 2) & 0xff) << 24) | 103 ((long) (getByte(pos + 3) & 0xff) << 16) | 104 ((long) (getByte(pos + 4) & 0xff) << 8) | 105 ((long) (getByte(pos + 5) & 0xff) << 0); 106 } 107 108 /** 109 * Writes a long to the indicated position 110 */ 111 public final void putSixLong(long pos, long value) { 112 if(value<0) throw new IllegalArgumentException(); 113 if(value >> (6*8)!=0) 114 throw new IllegalArgumentException("does not fit"); 115 //TODO read/write as integer+short, might be faster 116 putByte(pos + 0, (byte) (0xff & (value >> 40))); 117 putByte(pos + 1, (byte) (0xff & (value >> 32))); 118 putByte(pos + 2, (byte) (0xff & (value >> 24))); 119 putByte(pos + 3, (byte) (0xff & (value >> 16))); 120 putByte(pos + 4, (byte) (0xff & (value >> 8))); 121 putByte(pos + 5, (byte) (0xff & (value >> 0))); 122 123 } 124 90 125 91 126 /** returns underlying file if it exists */ … … 111 146 public static Factory fileFactory(final boolean readOnly, final boolean RAF, final File indexFile){ 112 147 return fileFactory(readOnly, RAF, indexFile, 113 new File(indexFile.getPath() + Stor ageDirect.DATA_FILE_EXT),114 new File(indexFile.getPath() + Stor ageJournaled.TRANS_LOG_FILE_EXT));148 new File(indexFile.getPath() + StoreDirect.DATA_FILE_EXT), 149 new File(indexFile.getPath() + StoreWAL.TRANS_LOG_FILE_EXT)); 115 150 } 116 151 … … 142 177 return new Factory() { 143 178 144 @Override public Volume createIndexVolume() { 179 @Override public synchronized Volume createIndexVolume() { 145 180 return new MemoryVol(useDirectBuffer); 146 181 } 147 182 148 @Override public Volume createPhysVolume() { 183 @Override public synchronized Volume createPhysVolume() { 149 184 return new MemoryVol(useDirectBuffer); 150 185 } 151 186 152 @Override public Volume createTransLogVolume() { 187 @Override public synchronized Volume createTransLogVolume() { 153 188 return new MemoryVol(useDirectBuffer); 154 189 } … … 181 216 //check for most common case, this is already mapped 182 217 if(buffersPos<buffers.length && buffers[buffersPos]!=null && 183 buffers[buffersPos].capacity()>=offset% BUF_SIZE) 218 buffers[buffersPos].capacity()>=offset% BUF_SIZE){ 184 219 return; 220 } 185 221 186 222 growLock.lock(); … … 191 227 return; 192 228 229 ByteBuffer[] buffers2 = buffers; 193 230 194 231 //grow array if necessary 195 if(buffersPos>=buffers.length){ 196 buffers = Arrays.copyOf(buffers, Math.max(buffersPos, buffers.length * 2)); 197 } 232 if(buffersPos>=buffers2.length){ 233 buffers2 = Arrays.copyOf(buffers2, Math.max(buffersPos+1, buffers2.length * 2)); 234 } 235 198 236 199 237 //just remap file buffer 200 ByteBuffer newBuf = makeNewBuffer(offset); 238 if( buffers2[buffersPos] == null){ 239 //make sure previous buffer is fully expanded 240 if(buffersPos>0){ 241 ByteBuffer oldPrev = buffers2[buffersPos-1]; 242 if(oldPrev == null || oldPrev.capacity()!=BUF_SIZE){ 243 buffers2[buffersPos-1] = makeNewBuffer(1L*buffersPos*BUF_SIZE-1,buffers2); 244 } 245 } 246 } 247 248 249 ByteBuffer newBuf = makeNewBuffer(offset, buffers2); 201 250 if(readOnly) 202 251 newBuf = newBuf.asReadOnlyBuffer(); 203 252 204 buffers[buffersPos] = newBuf; 253 buffers2[buffersPos] = newBuf; 254 255 buffers = buffers2; 205 256 }finally{ 206 257 growLock.unlock(); … … 208 259 } 209 260 210 protected abstract ByteBuffer makeNewBuffer(long offset); 261 protected abstract ByteBuffer makeNewBuffer(long offset, ByteBuffer[] buffers2); 211 262 212 263 protected final ByteBuffer internalByteBuffer(long offset) { … … 233 284 234 285 235 @Override public finalvoid putData(final long offset, final byte[]value, final int size){286 @Override public void putData(final long offset, final byte[] src, int srcPos, int srcSize){ 236 287 final ByteBuffer b1 = internalByteBuffer(offset); 237 288 final int bufPos = (int) (offset% BUF_SIZE); … … 239 290 synchronized (b1){ 240 291 b1.position(bufPos); 241 b1.put( value, 0, size);292 b1.put(src, srcPos, srcSize); 242 293 } 243 294 } … … 264 315 try{ 265 316 return internalByteBuffer(offset).getInt((int) (offset% BUF_SIZE)); 317 } catch (NullPointerException e) { 318 throw new RuntimeException(""+offset,e); 319 266 320 }catch(IndexOutOfBoundsException e){ 267 321 throw new IOError(new EOFException()); … … 346 400 protected final FileChannel.MapMode mapMode; 347 401 protected final java.io.RandomAccessFile raf; 402 403 protected final Map<ByteBuffer, String> unreleasedBuffers = 404 Utils.isWindows() ? new WeakHashMap<ByteBuffer, String>() : null; 348 405 349 406 static final int BUF_SIZE_INC = 1024*1024; … … 370 427 }else{ 371 428 buffers = new ByteBuffer[1]; 372 buffers[0] = fileChannel.map(mapMode, 0, INITIAL_SIZE); 373 if(mapMode == FileChannel.MapMode.READ_ONLY) 374 buffers[0] = buffers[0].asReadOnlyBuffer(); 429 // buffers[0] = fileChannel.map(mapMode, 0, INITIAL_SIZE); 430 // if(mapMode == FileChannel.MapMode.READ_ONLY) 431 // buffers[0] = buffers[0].asReadOnlyBuffer(); 375 432 376 433 } … … 394 451 } 395 452 buffers = null; 453 if(unreleasedBuffers!=null){ 454 for(ByteBuffer b:unreleasedBuffers.keySet().toArray(new MappedByteBuffer[0])){ 455 if(b!=null && (b instanceof MappedByteBuffer)){ 456 unmap((MappedByteBuffer) b); 457 } 458 } 459 } 460 396 461 } catch (IOException e) { 397 462 throw new IOError(e); … … 428 493 429 494 @Override 430 protected ByteBuffer makeNewBuffer(long offset) { 431 try { 432 //unmap old buffer on windows 433 int bufPos = (int) (offset/BUF_SIZE); 434 if(bufPos<buffers.length && buffers[bufPos]!=null){ 435 unmap((MappedByteBuffer) buffers[bufPos]); 436 buffers[bufPos] = null; 437 } 438 495 protected ByteBuffer makeNewBuffer(long offset, ByteBuffer[] buffers2) { 496 try { 439 497 long newBufSize = offset% BUF_SIZE; 440 498 newBufSize = newBufSize + newBufSize%BUF_SIZE_INC; //round to BUF_SIZE_INC 441 returnfileChannel.map(442 mapMode,443 offset - offset% BUF_SIZE, newBufSize );499 ByteBuffer buf = fileChannel.map( mapMode, offset - offset% BUF_SIZE, newBufSize ); 500 if(unreleasedBuffers!=null) unreleasedBuffers.put(buf, ""); 501 return buf; 444 502 } catch (IOException e) { 445 503 if(e.getCause()!=null && e.getCause() instanceof OutOfMemoryError){ … … 463 521 super(false); 464 522 this.useDirectBuffer = useDirectBuffer; 465 ByteBuffer b0 = useDirectBuffer? 466 ByteBuffer.allocateDirect(INITIAL_SIZE) : 467 ByteBuffer.allocate(INITIAL_SIZE); 468 buffers = new ByteBuffer[]{b0}; 469 } 470 471 @Override protected ByteBuffer makeNewBuffer(long offset) { 523 // ByteBuffer b0 = useDirectBuffer? 524 // ByteBuffer.allocateDirect(INITIAL_SIZE) : 525 // ByteBuffer.allocate(INITIAL_SIZE); 526 // buffers = new ByteBuffer[]{b0}; 527 buffers=new ByteBuffer[1]; 528 } 529 530 @Override protected ByteBuffer makeNewBuffer(long offset, ByteBuffer[] buffers2) { 472 531 final int newBufSize = Utils.nextPowTwo((int) (offset % BUF_SIZE)); 473 532 //double size of existing in-memory-buffer … … 476 535 ByteBuffer.allocate(newBufSize); 477 536 final int buffersPos = (int) (offset/ BUF_SIZE); 478 final ByteBuffer oldBuffer = buffers[buffersPos]; 537 final ByteBuffer oldBuffer = buffers2[buffersPos]; 479 538 if(oldBuffer!=null){ 480 539 //copy old buffer if it exists … … 580 639 581 640 @Override 582 synchronized public void putData(long offset, byte[] value, int size){583 try { 584 if(pos!=offset){ 585 raf.seek(offset); 586 } 587 pos=offset+size; 588 raf.write( value,0,size);641 synchronized public void putData(final long offset, final byte[] src, int srcPos, int srcSize){ 642 try { 643 if(pos!=offset){ 644 raf.seek(offset); 645 } 646 pos=offset+srcSize; 647 raf.write(src,srcPos,srcSize); 589 648 } catch (IOException e) { 590 649 throw new IOError(e); … … 602 661 byte[] b = new byte[size]; 603 662 buf.get(b); 604 putData(offset, b, size); 663 putData(offset, b, 0, size); 605 664 } catch (IOException e) { 606 665 throw new IOError(e); … … 708 767 } 709 768 710 public static class AsyncFileChannelVol extends Volume{711 712 713 protected AsynchronousFileChannel channel;714 protected final boolean readOnly;715 protected final File file;716 717 public AsyncFileChannelVol(File file, boolean readOnly){718 this.readOnly = readOnly;719 this.file = file;720 try {721 this.channel = readOnly?722 AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.READ):723 AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.READ, StandardOpenOption.WRITE);724 725 } catch (IOException e) {726 throw new IOError(e);727 }728 }729 730 @Override731 public void ensureAvailable(long offset) {732 //we do not have a list of ByteBuffers, so ensure size does not have to do anything733 }734 735 736 737 protected void await(Future<Integer> future, int size) {738 try {739 int res = future.get();740 if(res!=size) throw new InternalError("not enough bytes");741 } catch (InterruptedException e) {742 throw new RuntimeException(e);743 } catch (ExecutionException e) {744 throw new RuntimeException(e);745 }746 }747 748 @Override749 public void putByte(long offset, byte value) {750 ByteBuffer b = ByteBuffer.allocate(1);751 b.put(0, value);752 await(channel.write(b, offset),1);753 }754 @Override755 public void putInt(long offset, int value) {756 ByteBuffer b = ByteBuffer.allocate(4);757 b.putInt(0, value);758 await(channel.write(b, offset),4);759 }760 761 @Override762 public void putLong(long offset, long value) {763 ByteBuffer b = ByteBuffer.allocate(8);764 b.putLong(0, value);765 await(channel.write(b, offset),8);766 }767 768 @Override769 public void putData(long offset, byte[] value, int size) {770 ByteBuffer b = ByteBuffer.wrap(value);771 b.limit(size);772 await(channel.write(b,offset),size);773 }774 775 @Override776 public void putData(long offset, ByteBuffer buf) {777 await(channel.write(buf,offset), buf.limit() - buf.position());778 }779 780 781 782 @Override783 public long getLong(long offset) {784 ByteBuffer b = ByteBuffer.allocate(8);785 await(channel.read(b, offset), 8);786 b.rewind();787 return b.getLong();788 }789 790 @Override791 public byte getByte(long offset) {792 ByteBuffer b = ByteBuffer.allocate(1);793 await(channel.read(b, offset), 1);794 b.rewind();795 return b.get();796 }797 798 @Override799 public int getInt(long offset) {800 ByteBuffer b = ByteBuffer.allocate(4);801 await(channel.read(b, offset), 4);802 b.rewind();803 return b.getInt();804 }805 806 807 808 @Override809 public DataInput2 getDataInput(long offset, int size) {810 ByteBuffer b = ByteBuffer.allocate(size);811 await(channel.read(b, offset), size);812 b.rewind();813 return new DataInput2(b,0);814 }815 816 @Override817 public void close() {818 try {819 channel.close();820 } catch (IOException e) {821 throw new IOError(e);822 }823 }824 825 @Override826 public void sync() {827 try {828 channel.force(true);829 } catch (IOException e) {830 throw new IOError(e);831 }832 }833 834 @Override835 public boolean isEmpty() {836 return file.length()>0;837 }838 839 @Override840 public void deleteFile() {841 file.delete();842 }843 844 @Override845 public boolean isSliced() {846 return false;847 }848 849 @Override850 public File getFile() {851 return file;852 }853 }854 769 855 770
Note:
See TracChangeset
for help on using the changeset viewer.