[8403] | 1 | // License: GPL. For details, see LICENSE file.
|
---|
| 2 | package org.openstreetmap.josm.data.cache;
|
---|
| 3 |
|
---|
[8673] | 4 | import java.io.IOException;
|
---|
| 5 | import java.net.URL;
|
---|
[8403] | 6 | import java.util.Iterator;
|
---|
| 7 | import java.util.Map;
|
---|
| 8 | import java.util.concurrent.ConcurrentHashMap;
|
---|
| 9 | import java.util.concurrent.LinkedBlockingDeque;
|
---|
| 10 | import java.util.concurrent.Semaphore;
|
---|
[11438] | 11 | import java.util.concurrent.ThreadPoolExecutor;
|
---|
[8403] | 12 | import java.util.concurrent.TimeUnit;
|
---|
| 13 |
|
---|
| 14 | import org.openstreetmap.josm.Main;
|
---|
| 15 |
|
---|
| 16 | /**
|
---|
| 17 | * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
|
---|
| 18 | * and it will set a runnable task with semaphore release, when job has finished.
|
---|
[10723] | 19 | * <p>
|
---|
[8403] | 20 | * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
|
---|
[10723] | 21 | * guarantee that all threads will be busy, when there is work for them[2]. <br>
|
---|
[8403] | 22 | * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
|
---|
[10723] | 23 | * tasks do not go through the Queue <br>
|
---|
[8403] | 24 | * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
|
---|
| 25 | * take the first available job and wait for semaphore. It might be the case, that semaphore was released
|
---|
| 26 | * for some task further in queue, but this implementation doesn't try to detect such situation
|
---|
| 27 | *
|
---|
[10723] | 28 | * @author Wiktor Niesiobędzki
|
---|
[8403] | 29 | */
|
---|
| 30 | public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
|
---|
[8418] | 31 | private static final long serialVersionUID = 1L;
|
---|
[8403] | 32 |
|
---|
[8418] | 33 | private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
|
---|
| 34 | private final int hostLimit;
|
---|
[8403] | 35 |
|
---|
[11438] | 36 | private ThreadPoolExecutor executor;
|
---|
| 37 |
|
---|
| 38 | private int corePoolSize;
|
---|
| 39 |
|
---|
| 40 | private int maximumPoolSize;
|
---|
| 41 |
|
---|
[8403] | 42 | /**
|
---|
| 43 | * Creates an unbounded queue
|
---|
| 44 | * @param hostLimit how many parallel calls to host to allow
|
---|
| 45 | */
|
---|
| 46 | public HostLimitQueue(int hostLimit) {
|
---|
| 47 | super(); // create unbounded queue
|
---|
| 48 | this.hostLimit = hostLimit;
|
---|
| 49 | }
|
---|
| 50 |
|
---|
| 51 | private JCSCachedTileLoaderJob<?, ?> findJob() {
|
---|
| 52 | for (Iterator<Runnable> it = iterator(); it.hasNext();) {
|
---|
| 53 | Runnable r = it.next();
|
---|
| 54 | if (r instanceof JCSCachedTileLoaderJob) {
|
---|
| 55 | JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
|
---|
| 56 | if (tryAcquireSemaphore(job)) {
|
---|
| 57 | if (remove(job)) {
|
---|
| 58 | return job;
|
---|
| 59 | } else {
|
---|
[9004] | 60 | // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
|
---|
[8403] | 61 | // release the semaphore and look for another candidate
|
---|
| 62 | releaseSemaphore(job);
|
---|
| 63 | }
|
---|
| 64 | } else {
|
---|
[8673] | 65 | URL url = null;
|
---|
| 66 | try {
|
---|
| 67 | url = job.getUrl();
|
---|
| 68 | } catch (IOException e) {
|
---|
[10420] | 69 | Main.debug(e);
|
---|
[8673] | 70 | }
|
---|
[9004] | 71 | Main.debug("TMS - Skipping job {0} because host limit reached", url);
|
---|
[8403] | 72 | }
|
---|
| 73 | }
|
---|
| 74 | }
|
---|
| 75 | return null;
|
---|
| 76 | }
|
---|
| 77 |
|
---|
| 78 | @Override
|
---|
| 79 | public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
|
---|
| 80 | Runnable job = findJob();
|
---|
| 81 | if (job != null) {
|
---|
| 82 | return job;
|
---|
| 83 | }
|
---|
| 84 | job = pollFirst(timeout, unit);
|
---|
| 85 | if (job != null) {
|
---|
[11444] | 86 | try {
|
---|
| 87 | boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
|
---|
| 88 | return gotLock ? job : null;
|
---|
| 89 | } catch (InterruptedException e) {
|
---|
| 90 | // acquire my got interrupted, first offer back what was taken
|
---|
[11546] | 91 | if (!offer(job)) {
|
---|
| 92 | Main.warn("Unable to offer back " + job);
|
---|
| 93 | }
|
---|
[11444] | 94 | throw e;
|
---|
| 95 | }
|
---|
[8403] | 96 | }
|
---|
| 97 | return job;
|
---|
| 98 | }
|
---|
| 99 |
|
---|
| 100 | @Override
|
---|
| 101 | public Runnable take() throws InterruptedException {
|
---|
| 102 | Runnable job = findJob();
|
---|
| 103 | if (job != null) {
|
---|
| 104 | return job;
|
---|
| 105 | }
|
---|
| 106 | job = takeFirst();
|
---|
[11444] | 107 | try {
|
---|
| 108 | acquireSemaphore(job);
|
---|
| 109 | } catch (InterruptedException e) {
|
---|
| 110 | // acquire my got interrupted, first offer back what was taken
|
---|
[11546] | 111 | if (!offer(job)) {
|
---|
| 112 | Main.warn("Unable to offer back " + job);
|
---|
| 113 | }
|
---|
[11444] | 114 | throw e;
|
---|
| 115 | }
|
---|
[8403] | 116 | return job;
|
---|
| 117 | }
|
---|
| 118 |
|
---|
[11438] | 119 | /**
|
---|
| 120 | * Set the executor for which this queue works. It's needed to spawn new threads.
|
---|
| 121 | * See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
|
---|
| 122 | *
|
---|
[11445] | 123 | * @param executor executor for which this queue works
|
---|
[11438] | 124 | */
|
---|
| 125 | public void setExecutor(ThreadPoolExecutor executor) {
|
---|
| 126 | this.executor = executor;
|
---|
| 127 | this.maximumPoolSize = executor.getMaximumPoolSize();
|
---|
| 128 | this.corePoolSize = executor.getCorePoolSize();
|
---|
| 129 | }
|
---|
| 130 |
|
---|
| 131 | @Override
|
---|
[11445] | 132 | public boolean offer(Runnable e) {
|
---|
| 133 | if (!super.offer(e)) {
|
---|
[11438] | 134 | return false;
|
---|
| 135 | }
|
---|
| 136 |
|
---|
| 137 | if (executor != null) {
|
---|
| 138 | // See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
|
---|
| 139 | // force spawn of a thread if not reached maximum
|
---|
| 140 | int currentPoolSize = executor.getPoolSize();
|
---|
| 141 | if (currentPoolSize < maximumPoolSize
|
---|
[11445] | 142 | && currentPoolSize >= corePoolSize) {
|
---|
[11438] | 143 | executor.setCorePoolSize(currentPoolSize + 1);
|
---|
| 144 | executor.setCorePoolSize(corePoolSize);
|
---|
| 145 | }
|
---|
| 146 | }
|
---|
| 147 | return true;
|
---|
| 148 | }
|
---|
| 149 |
|
---|
[10378] | 150 | private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
|
---|
[8673] | 151 | String host;
|
---|
| 152 | try {
|
---|
| 153 | host = job.getUrl().getHost();
|
---|
| 154 | } catch (IOException e) {
|
---|
| 155 | // do not pass me illegal URL's
|
---|
| 156 | throw new IllegalArgumentException(e);
|
---|
| 157 | }
|
---|
[8403] | 158 | Semaphore limit = hostSemaphores.get(host);
|
---|
| 159 | if (limit == null) {
|
---|
[8510] | 160 | synchronized (hostSemaphores) {
|
---|
[8403] | 161 | limit = hostSemaphores.get(host);
|
---|
| 162 | if (limit == null) {
|
---|
| 163 | limit = new Semaphore(hostLimit);
|
---|
| 164 | hostSemaphores.put(host, limit);
|
---|
| 165 | }
|
---|
| 166 | }
|
---|
| 167 | }
|
---|
| 168 | return limit;
|
---|
| 169 | }
|
---|
| 170 |
|
---|
| 171 | private void acquireSemaphore(Runnable job) throws InterruptedException {
|
---|
| 172 | if (job instanceof JCSCachedTileLoaderJob) {
|
---|
| 173 | final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
|
---|
[11381] | 174 | getSemaphore(jcsJob).acquire();
|
---|
| 175 | jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
|
---|
[8403] | 176 | }
|
---|
| 177 | }
|
---|
| 178 |
|
---|
[8510] | 179 | private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
|
---|
[8403] | 180 | boolean ret = true;
|
---|
| 181 | Semaphore limit = getSemaphore(job);
|
---|
| 182 | if (limit != null) {
|
---|
| 183 | ret = limit.tryAcquire();
|
---|
| 184 | if (ret) {
|
---|
[10608] | 185 | job.setFinishedTask(() -> releaseSemaphore(job));
|
---|
[8403] | 186 | }
|
---|
| 187 | }
|
---|
| 188 | return ret;
|
---|
| 189 | }
|
---|
| 190 |
|
---|
[11444] | 191 | private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
|
---|
| 192 | boolean ret = true;
|
---|
| 193 | if (job instanceof JCSCachedTileLoaderJob) {
|
---|
| 194 | final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
|
---|
| 195 | Semaphore limit = getSemaphore(jcsJob);
|
---|
| 196 | if (limit != null) {
|
---|
| 197 | ret = limit.tryAcquire(timeout, unit);
|
---|
| 198 | if (ret) {
|
---|
| 199 | jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
|
---|
| 200 | }
|
---|
| 201 | }
|
---|
| 202 | }
|
---|
| 203 | return ret;
|
---|
| 204 | }
|
---|
| 205 |
|
---|
[8510] | 206 | private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
|
---|
[8403] | 207 | Semaphore limit = getSemaphore(job);
|
---|
| 208 | if (limit != null) {
|
---|
| 209 | limit.release();
|
---|
| 210 | if (limit.availablePermits() > hostLimit) {
|
---|
| 211 | Main.warn("More permits than it should be");
|
---|
| 212 | }
|
---|
| 213 | }
|
---|
| 214 | }
|
---|
[8514] | 215 | }
|
---|