Ignore:
Timestamp:
2013-04-07T17:07:27+02:00 (12 years ago)
Author:
akks
Message:

JOSM/ImageryCache: updated MapDB (no more deadlocks, Java 1.6 compatible), less crashes, multiple-JOSM support

File:
1 edited

Legend:

Unmodified
Added
Removed
  • applications/editors/josm/plugins/imagerycache/src/org/mapdb/Volume.java

    r29363 r29484  
    2424import java.nio.ByteBuffer;
    2525import java.nio.MappedByteBuffer;
    26 import java.nio.channels.AsynchronousFileChannel;
    2726import java.nio.channels.FileChannel;
    28 import java.nio.file.StandardOpenOption;
    2927import java.util.Arrays;
     28import java.util.Map;
     29import java.util.WeakHashMap;
    3030import java.util.concurrent.ExecutionException;
    3131import java.util.concurrent.Future;
     
    3434
    3535/**
    36  * MapDB abstraction over raw storage (file, disk partition, memory etc...)
     36 * MapDB abstraction over raw storage (file, disk partition, memory etc...).
     37 * <p/>
     38 * Implementations needs to be thread safe (especially
     39 'ensureAvailable') operation.
     40 * However updates do not have to be atomic, it is clients responsibility
     41 * to ensure two threads are not writing/reading into the same location.
    3742 *
    3843 * @author Jan Kotek
     
    4146
    4247    public static final int BUF_SIZE = 1<<30;
    43     public static final int INITIAL_SIZE = 1024*32;
    4448
    4549    abstract public void ensureAvailable(final long offset);
     
    4953    abstract public void putByte(final long offset, final byte value);
    5054
    51     abstract public void putData(final long offset, final byte[] value, int size);
     55    abstract public void putData(final long offset, final byte[] src, int srcPos, int srcSize);
    5256    abstract public void putData(final long offset, final ByteBuffer buf);
    5357
     
    8892        putByte(offset, (byte)(b & 0xff));
    8993    }
     94
     95    /**
     96     * Reads a long from the indicated position
     97     */
     98    public final long getSixLong(long pos) {
     99        return
     100                ((long) (getByte(pos + 0) & 0xff) << 40) |
     101                        ((long) (getByte(pos + 1) & 0xff) << 32) |
     102                        ((long) (getByte(pos + 2) & 0xff) << 24) |
     103                        ((long) (getByte(pos + 3) & 0xff) << 16) |
     104                        ((long) (getByte(pos + 4) & 0xff) << 8) |
     105                        ((long) (getByte(pos + 5) & 0xff) << 0);
     106    }
     107
     108    /**
     109     * Writes a long to the indicated position
     110     */
     111    public final void putSixLong(long pos, long value) {
     112        if(value<0) throw new IllegalArgumentException();
     113        if(value >> (6*8)!=0)
     114                throw new IllegalArgumentException("does not fit");
     115        //TODO read/write as integer+short, might be faster
     116        putByte(pos + 0, (byte) (0xff & (value >> 40)));
     117        putByte(pos + 1, (byte) (0xff & (value >> 32)));
     118        putByte(pos + 2, (byte) (0xff & (value >> 24)));
     119        putByte(pos + 3, (byte) (0xff & (value >> 16)));
     120        putByte(pos + 4, (byte) (0xff & (value >> 8)));
     121        putByte(pos + 5, (byte) (0xff & (value >> 0)));
     122
     123    }
     124
    90125
    91126    /** returns underlying file if it exists */
     
    111146    public static Factory fileFactory(final boolean readOnly, final boolean RAF, final File indexFile){
    112147        return fileFactory(readOnly, RAF, indexFile,
    113                 new File(indexFile.getPath() + StorageDirect.DATA_FILE_EXT),
    114                 new File(indexFile.getPath() + StorageJournaled.TRANS_LOG_FILE_EXT));
     148                new File(indexFile.getPath() + StoreDirect.DATA_FILE_EXT),
     149                new File(indexFile.getPath() + StoreWAL.TRANS_LOG_FILE_EXT));
    115150    }
    116151
     
    142177        return new Factory() {
    143178
    144             @Override public Volume createIndexVolume() {
     179            @Override public synchronized  Volume createIndexVolume() {
    145180                return new MemoryVol(useDirectBuffer);
    146181            }
    147182
    148             @Override public Volume createPhysVolume() {
     183            @Override public synchronized Volume createPhysVolume() {
    149184                return new MemoryVol(useDirectBuffer);
    150185            }
    151186
    152             @Override public Volume createTransLogVolume() {
     187            @Override public synchronized Volume createTransLogVolume() {
    153188                return new MemoryVol(useDirectBuffer);
    154189            }
     
    181216            //check for most common case, this is already mapped
    182217            if(buffersPos<buffers.length && buffers[buffersPos]!=null &&
    183                     buffers[buffersPos].capacity()>=offset% BUF_SIZE)
     218                    buffers[buffersPos].capacity()>=offset% BUF_SIZE){
    184219                return;
     220            }
    185221
    186222            growLock.lock();
     
    191227                    return;
    192228
     229                ByteBuffer[] buffers2 = buffers;
    193230
    194231                //grow array if necessary
    195                 if(buffersPos>=buffers.length){
    196                     buffers = Arrays.copyOf(buffers, Math.max(buffersPos, buffers.length * 2));
    197                 }
     232                if(buffersPos>=buffers2.length){
     233                    buffers2 = Arrays.copyOf(buffers2, Math.max(buffersPos+1, buffers2.length * 2));
     234                }
     235
    198236
    199237                //just remap file buffer
    200                 ByteBuffer newBuf = makeNewBuffer(offset);
     238                if( buffers2[buffersPos] == null){
     239                    //make sure previous buffer is fully expanded
     240                    if(buffersPos>0){
     241                        ByteBuffer oldPrev = buffers2[buffersPos-1];
     242                        if(oldPrev == null || oldPrev.capacity()!=BUF_SIZE){
     243                            buffers2[buffersPos-1]  = makeNewBuffer(1L*buffersPos*BUF_SIZE-1,buffers2);
     244                        }
     245                    }
     246                }
     247
     248
     249                ByteBuffer newBuf = makeNewBuffer(offset, buffers2);
    201250                if(readOnly)
    202251                    newBuf = newBuf.asReadOnlyBuffer();
    203252
    204                 buffers[buffersPos] = newBuf;
     253                buffers2[buffersPos] = newBuf;
     254
     255                buffers = buffers2;
    205256            }finally{
    206257                growLock.unlock();
     
    208259        }
    209260
    210         protected abstract ByteBuffer makeNewBuffer(long offset);
     261        protected abstract ByteBuffer makeNewBuffer(long offset, ByteBuffer[] buffers2);
    211262
    212263        protected final ByteBuffer internalByteBuffer(long offset) {
     
    233284
    234285
    235         @Override public final void putData(final long offset, final byte[] value, final int size) {
     286        @Override public void putData(final long offset, final byte[] src, int srcPos, int srcSize){
    236287            final ByteBuffer b1 = internalByteBuffer(offset);
    237288            final int bufPos = (int) (offset% BUF_SIZE);
     
    239290            synchronized (b1){
    240291                b1.position(bufPos);
    241                 b1.put(value, 0, size);
     292                b1.put(src, srcPos, srcSize);
    242293            }
    243294        }
     
    264315            try{
    265316                return internalByteBuffer(offset).getInt((int) (offset% BUF_SIZE));
     317            } catch (NullPointerException e) {
     318                throw new RuntimeException(""+offset,e);
     319
    266320            }catch(IndexOutOfBoundsException e){
    267321                throw new IOError(new EOFException());
     
    346400        protected final FileChannel.MapMode mapMode;
    347401        protected final java.io.RandomAccessFile raf;
     402
     403        protected final Map<ByteBuffer, String> unreleasedBuffers =
     404                Utils.isWindows() ? new WeakHashMap<ByteBuffer, String>() : null;
    348405
    349406        static final int BUF_SIZE_INC = 1024*1024;
     
    370427                }else{
    371428                    buffers = new ByteBuffer[1];
    372                     buffers[0] = fileChannel.map(mapMode, 0, INITIAL_SIZE);
    373                     if(mapMode == FileChannel.MapMode.READ_ONLY)
    374                         buffers[0] = buffers[0].asReadOnlyBuffer();
     429//                    buffers[0] = fileChannel.map(mapMode, 0, INITIAL_SIZE);
     430//                    if(mapMode == FileChannel.MapMode.READ_ONLY)
     431//                        buffers[0] = buffers[0].asReadOnlyBuffer();
    375432
    376433                }
     
    394451                }
    395452                buffers = null;
     453                if(unreleasedBuffers!=null){
     454                    for(ByteBuffer b:unreleasedBuffers.keySet().toArray(new MappedByteBuffer[0])){
     455                        if(b!=null && (b instanceof MappedByteBuffer)){
     456                            unmap((MappedByteBuffer) b);
     457                        }
     458                    }
     459                }
     460
    396461            } catch (IOException e) {
    397462                throw new IOError(e);
     
    428493
    429494        @Override
    430         protected ByteBuffer makeNewBuffer(long offset) {
    431             try {
    432                 //unmap old buffer on windows
    433                 int bufPos = (int) (offset/BUF_SIZE);
    434                 if(bufPos<buffers.length && buffers[bufPos]!=null){
    435                     unmap((MappedByteBuffer) buffers[bufPos]);
    436                     buffers[bufPos] = null;
    437                 }
    438 
     495        protected ByteBuffer makeNewBuffer(long offset, ByteBuffer[] buffers2) {
     496            try {
    439497                long newBufSize =  offset% BUF_SIZE;
    440498                newBufSize = newBufSize + newBufSize%BUF_SIZE_INC; //round to BUF_SIZE_INC
    441                 return fileChannel.map(
    442                         mapMode,
    443                         offset - offset% BUF_SIZE, newBufSize );
     499                ByteBuffer buf = fileChannel.map( mapMode, offset - offset% BUF_SIZE, newBufSize );
     500                if(unreleasedBuffers!=null) unreleasedBuffers.put(buf, "");
     501                return buf;
    444502            } catch (IOException e) {
    445503                if(e.getCause()!=null && e.getCause() instanceof OutOfMemoryError){
     
    463521            super(false);
    464522            this.useDirectBuffer = useDirectBuffer;
    465             ByteBuffer b0 = useDirectBuffer?
    466                     ByteBuffer.allocateDirect(INITIAL_SIZE) :
    467                     ByteBuffer.allocate(INITIAL_SIZE);
    468             buffers = new ByteBuffer[]{b0};
    469         }
    470 
    471         @Override protected ByteBuffer makeNewBuffer(long offset) {
     523//            ByteBuffer b0 = useDirectBuffer?
     524//                    ByteBuffer.allocateDirect(INITIAL_SIZE) :
     525//                    ByteBuffer.allocate(INITIAL_SIZE);
     526//            buffers = new ByteBuffer[]{b0};
     527            buffers=new ByteBuffer[1];
     528        }
     529
     530        @Override protected ByteBuffer makeNewBuffer(long offset, ByteBuffer[] buffers2) {
    472531            final int newBufSize = Utils.nextPowTwo((int) (offset % BUF_SIZE));
    473532            //double size of existing in-memory-buffer
     
    476535                    ByteBuffer.allocate(newBufSize);
    477536            final int buffersPos = (int) (offset/ BUF_SIZE);
    478             final ByteBuffer oldBuffer = buffers[buffersPos];
     537            final ByteBuffer oldBuffer = buffers2[buffersPos];
    479538            if(oldBuffer!=null){
    480539                //copy old buffer if it exists
     
    580639
    581640        @Override
    582         synchronized public void putData(long offset, byte[] value, int size) {
    583             try {
    584                 if(pos!=offset){
    585                     raf.seek(offset);
    586                 }
    587                 pos=offset+size;
    588                 raf.write(value,0,size);
     641        synchronized public void putData(final long offset, final byte[] src, int srcPos, int srcSize){
     642            try {
     643                if(pos!=offset){
     644                    raf.seek(offset);
     645                }
     646                pos=offset+srcSize;
     647                raf.write(src,srcPos,srcSize);
    589648            } catch (IOException e) {
    590649                throw new IOError(e);
     
    602661                byte[] b = new byte[size];
    603662                buf.get(b);
    604                 putData(offset, b, size);
     663                putData(offset, b, 0, size);
    605664            } catch (IOException e) {
    606665                throw new IOError(e);
     
    708767    }
    709768
    710     public static class AsyncFileChannelVol extends Volume{
    711 
    712 
    713         protected AsynchronousFileChannel channel;
    714         protected final boolean readOnly;
    715         protected final File file;
    716 
    717         public AsyncFileChannelVol(File file, boolean readOnly){
    718             this.readOnly = readOnly;
    719             this.file = file;
    720             try {
    721                 this.channel = readOnly?
    722                         AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.READ):
    723                         AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.READ, StandardOpenOption.WRITE);
    724 
    725             } catch (IOException e) {
    726                 throw new IOError(e);
    727             }
    728         }
    729 
    730         @Override
    731         public void ensureAvailable(long offset) {
    732             //we do not have a list of ByteBuffers, so ensure size does not have to do anything
    733         }
    734 
    735 
    736 
    737         protected void await(Future<Integer> future, int size) {
    738             try {
    739                 int res = future.get();
    740                 if(res!=size) throw new InternalError("not enough bytes");
    741             } catch (InterruptedException e) {
    742                 throw new RuntimeException(e);
    743             } catch (ExecutionException e) {
    744                 throw new RuntimeException(e);
    745             }
    746         }
    747 
    748         @Override
    749         public void putByte(long offset, byte value) {
    750             ByteBuffer b = ByteBuffer.allocate(1);
    751             b.put(0, value);
    752             await(channel.write(b, offset),1);
    753         }
    754         @Override
    755         public void putInt(long offset, int value) {
    756             ByteBuffer b = ByteBuffer.allocate(4);
    757             b.putInt(0, value);
    758             await(channel.write(b, offset),4);
    759         }
    760 
    761         @Override
    762         public void putLong(long offset, long value) {
    763             ByteBuffer b = ByteBuffer.allocate(8);
    764             b.putLong(0, value);
    765             await(channel.write(b, offset),8);
    766         }
    767 
    768         @Override
    769         public void putData(long offset, byte[] value, int size) {
    770             ByteBuffer b = ByteBuffer.wrap(value);
    771             b.limit(size);
    772             await(channel.write(b,offset),size);
    773         }
    774 
    775         @Override
    776         public void putData(long offset, ByteBuffer buf) {
    777             await(channel.write(buf,offset), buf.limit() - buf.position());
    778         }
    779 
    780 
    781 
    782         @Override
    783         public long getLong(long offset) {
    784             ByteBuffer b = ByteBuffer.allocate(8);
    785             await(channel.read(b, offset), 8);
    786             b.rewind();
    787             return b.getLong();
    788         }
    789 
    790         @Override
    791         public byte getByte(long offset) {
    792             ByteBuffer b = ByteBuffer.allocate(1);
    793             await(channel.read(b, offset), 1);
    794             b.rewind();
    795             return b.get();
    796         }
    797 
    798         @Override
    799         public int getInt(long offset) {
    800             ByteBuffer b = ByteBuffer.allocate(4);
    801             await(channel.read(b, offset), 4);
    802             b.rewind();
    803             return b.getInt();
    804         }
    805 
    806 
    807 
    808         @Override
    809         public DataInput2 getDataInput(long offset, int size) {
    810             ByteBuffer b = ByteBuffer.allocate(size);
    811             await(channel.read(b, offset), size);
    812             b.rewind();
    813             return new DataInput2(b,0);
    814         }
    815 
    816         @Override
    817         public void close() {
    818             try {
    819                 channel.close();
    820             } catch (IOException e) {
    821                 throw new IOError(e);
    822             }
    823         }
    824 
    825         @Override
    826         public void sync() {
    827             try {
    828                 channel.force(true);
    829             } catch (IOException e) {
    830                 throw new IOError(e);
    831             }
    832         }
    833 
    834         @Override
    835         public boolean isEmpty() {
    836             return file.length()>0;
    837         }
    838 
    839         @Override
    840         public void deleteFile() {
    841             file.delete();
    842         }
    843 
    844         @Override
    845         public boolean isSliced() {
    846             return false;
    847         }
    848 
    849         @Override
    850         public File getFile() {
    851             return file;
    852         }
    853     }
    854769
    855770
Note: See TracChangeset for help on using the changeset viewer.