Ignore:
Timestamp:
2015-05-02T22:53:56+02:00 (9 years ago)
Author:
bastiK
Message:

applied #10902 - TMS simultaneous connections (2nd patch by wiktorn)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoaderJob.java

    r8307 r8314  
    88import java.util.concurrent.ConcurrentHashMap;
    99import java.util.concurrent.Executor;
    10 import java.util.concurrent.LinkedBlockingDeque;
    1110import java.util.concurrent.Semaphore;
    1211import java.util.concurrent.ThreadPoolExecutor;
     
    4241
    4342    /**
     43     * Limit definition for per host concurrent connections
     44     */
     45    public final static IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
     46
     47     /*
     48     * Host limit guards the area - between submission to the queue up to loading is finished. It uses executionGuard method
     49     * from JCSCachedTileLoaderJob to acquire the semaphore, and releases it - when loadingFinished is called (but not when
     50     * LoadResult.GUARD_REJECTED is set)
     51     *
     52     */
     53
     54    private Semaphore getSemaphore() {
     55        String host = getUrl().getHost();
     56        Semaphore limit = HOST_LIMITS.get(host);
     57        if (limit == null) {
     58            synchronized(HOST_LIMITS) {
     59                limit = HOST_LIMITS.get(host);
     60                if (limit == null) {
     61                    limit = new Semaphore(HOST_LIMIT.get().intValue());
     62                    HOST_LIMITS.put(host, limit);
     63                }
     64            }
     65        }
     66        return limit;
     67    }
     68
     69    private boolean acquireSemaphore() {
     70        boolean ret = true;
     71        Semaphore limit = getSemaphore();
     72        if (limit != null) {
     73            ret = limit.tryAcquire();
     74            if (!ret) {
     75                Main.debug("rejecting job because of per host limit");
     76            }
     77        }
     78        return ret;
     79
     80    }
     81
     82    private void releaseSemaphore() {
     83        Semaphore limit = getSemaphore();
     84        if (limit != null) {
     85            limit.release();
     86        }
     87    }
     88
     89
     90    private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>();
     91
     92    /**
    4493     * overrides the THREAD_LIMIT in superclass, as we want to have separate limit and pool for TMS
    4594     */
    46     public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25);
    47 
    48     /**
    49      * Limit definition for per host concurrent connections
    50      */
    51     public static final IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
    52 
    53 
    54     private static class LIFOQueue extends LinkedBlockingDeque<Runnable> {
    55         public LIFOQueue(int capacity) {
    56             super(capacity);
    57         }
    58 
    59         private final static Semaphore getSemaphore(Runnable r) {
    60             if (!(r instanceof TMSCachedTileLoaderJob))
    61                 return null;
    62             TMSCachedTileLoaderJob cachedJob = (TMSCachedTileLoaderJob) r;
    63             Semaphore limit = HOST_LIMITS.get(cachedJob.getUrl().getHost());
    64             if (limit == null) {
    65                 synchronized(HOST_LIMITS) {
    66                     limit = HOST_LIMITS.get(cachedJob.getUrl().getHost());
    67                     if (limit == null) {
    68                         limit = new Semaphore(HOST_LIMIT.get().intValue());
    69                         HOST_LIMITS.put(cachedJob.getUrl().getHost(), limit);
    70                     }
    71                 }
    72             }
    73             return limit;
    74         }
    75 
    76         private boolean acquireSemaphore(Runnable r) {
    77             boolean ret = true;
    78             Semaphore limit = getSemaphore(r);
    79             if (limit != null) {
    80                 ret = limit.tryAcquire();
    81                 if (!ret) {
    82                     Main.debug("rejecting job because of per host limit");
    83                 }
    84             }
    85             return ret;
    86         }
    87 
    88         @Override
    89         public boolean offer(Runnable t) {
    90             return acquireSemaphore(t) && super.offerFirst(t);
    91         }
    92 
    93         private Runnable releaseSemaphore(Runnable r) {
    94             Semaphore limit = getSemaphore(r);
    95             if (limit != null)
    96                 limit.release();
    97             return r;
    98         }
    99 
    100         @Override
    101         public Runnable remove() {
    102             return releaseSemaphore(super.removeFirst());
    103         }
    104 
    105         @Override
    106         public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
    107             return releaseSemaphore(super.poll(timeout, unit));
    108         }
    109 
    110         @Override
    111         public Runnable take() throws InterruptedException {
    112             return releaseSemaphore(super.take());
    113         }
    114     }
    115 
    116     private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>();
     95    public final static IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25);
    11796
    11897    /**
     
    143122    }
    144123
    145 
    146124    /**
    147125     * Constructor for creating a job, to get a specific tile from cache
     
    175153     *  this doesn't needs to be synchronized, as it's not that costly to keep only one execution
    176154     *  in parallel, but URL creation and Tile.getUrl() are costly and are not needed when fetching
    177      *  data from cache
     155     *  data from cache, that's why URL creation is postponed until it's needed
    178156     *
    179157     *  We need to have static url value for TileLoaderJob, as for some TileSources we might get different
     
    234212    }
    235213
     214    @Override
     215    protected boolean executionGuard() {
     216        return acquireSemaphore();
     217    }
     218
     219    @Override
     220    protected void executionFinished() {
     221        releaseSemaphore();
     222    }
     223
    236224    public void submit() {
    237225        tile.initLoading();
     
    246234            case FAILURE:
    247235                tile.setError("Problem loading tile");
     236                // no break intentional here
    248237            case SUCCESS:
    249238                handleNoTileAtZoom();
     
    254243                    }
    255244                }
     245                // no break intentional here
    256246            case REJECTED:
    257                 // do not set anything here, leave waiting sign
     247                // do nothing
    258248            }
    259249            if (listener != null) {
Note: See TracChangeset for help on using the changeset viewer.