source: josm/trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java@ 11444

Last change on this file since 11444 was 11444, checked in by wiktorn, 7 years ago

Fix disappearing download tasks.

When ThreadPoolExecutor is adapting it's size, it interrupts its threads from time to time. HostLimitQueue had a bug, that when interrupted when waiting to acquire lock, it didn't return the job back to the queue.

Added testcases that helped debug this and that verify, that we're actually doing host based limiting.

Sponsored by "Girl, interrupted" movie with Angelina Jolie.

See: #14166

  • Property svn:eol-style set to native
File size: 7.4 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.io.IOException;
5import java.net.URL;
6import java.util.Iterator;
7import java.util.Map;
8import java.util.concurrent.ConcurrentHashMap;
9import java.util.concurrent.LinkedBlockingDeque;
10import java.util.concurrent.Semaphore;
11import java.util.concurrent.ThreadPoolExecutor;
12import java.util.concurrent.TimeUnit;
13
14import org.openstreetmap.josm.Main;
15
16/**
17 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
18 * and it will set a runnable task with semaphore release, when job has finished.
19 * <p>
20 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
21 * guarantee that all threads will be busy, when there is work for them[2]. <br>
22 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
23 * tasks do not go through the Queue <br>
24 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
25 * take the first available job and wait for semaphore. It might be the case, that semaphore was released
26 * for some task further in queue, but this implementation doesn't try to detect such situation
27 *
28 * @author Wiktor Niesiobędzki
29 */
30public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
31 private static final long serialVersionUID = 1L;
32
33 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
34 private final int hostLimit;
35
36 private ThreadPoolExecutor executor;
37
38 private int corePoolSize;
39
40 private int maximumPoolSize;
41
42 /**
43 * Creates an unbounded queue
44 * @param hostLimit how many parallel calls to host to allow
45 */
46 public HostLimitQueue(int hostLimit) {
47 super(); // create unbounded queue
48 this.hostLimit = hostLimit;
49 }
50
51 private JCSCachedTileLoaderJob<?, ?> findJob() {
52 for (Iterator<Runnable> it = iterator(); it.hasNext();) {
53 Runnable r = it.next();
54 if (r instanceof JCSCachedTileLoaderJob) {
55 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
56 if (tryAcquireSemaphore(job)) {
57 if (remove(job)) {
58 return job;
59 } else {
60 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
61 // release the semaphore and look for another candidate
62 releaseSemaphore(job);
63 }
64 } else {
65 URL url = null;
66 try {
67 url = job.getUrl();
68 } catch (IOException e) {
69 Main.debug(e);
70 }
71 Main.debug("TMS - Skipping job {0} because host limit reached", url);
72 }
73 }
74 }
75 return null;
76 }
77
78 @Override
79 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
80 Runnable job = findJob();
81 if (job != null) {
82 return job;
83 }
84 job = pollFirst(timeout, unit);
85 if (job != null) {
86 try {
87 boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
88 return gotLock ? job : null;
89 } catch (InterruptedException e) {
90 // acquire my got interrupted, first offer back what was taken
91 offer(job);
92 throw e;
93 }
94 }
95 return job;
96 }
97
98 @Override
99 public Runnable take() throws InterruptedException {
100 Runnable job = findJob();
101 if (job != null) {
102 return job;
103 }
104 job = takeFirst();
105 try {
106 acquireSemaphore(job);
107 } catch (InterruptedException e) {
108 // acquire my got interrupted, first offer back what was taken
109 offer(job);
110 throw e;
111 }
112 return job;
113 }
114
115 /**
116 * Set the executor for which this queue works. It's needed to spawn new threads.
117 * See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
118 *
119 * @param executor
120 */
121
122 public void setExecutor(ThreadPoolExecutor executor) {
123 this.executor = executor;
124 this.maximumPoolSize = executor.getMaximumPoolSize();
125 this.corePoolSize = executor.getCorePoolSize();
126 }
127
128 @Override
129 public boolean offer(Runnable e)
130 {
131 if (super.offer(e) == false)
132 {
133 return false;
134 }
135
136 if (executor != null) {
137 // See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
138 // force spawn of a thread if not reached maximum
139 int currentPoolSize = executor.getPoolSize();
140 if (currentPoolSize < maximumPoolSize
141 && currentPoolSize >= corePoolSize)
142 {
143 executor.setCorePoolSize(currentPoolSize + 1);
144 executor.setCorePoolSize(corePoolSize);
145 }
146 }
147 return true;
148 }
149
150 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
151 String host;
152 try {
153 host = job.getUrl().getHost();
154 } catch (IOException e) {
155 // do not pass me illegal URL's
156 throw new IllegalArgumentException(e);
157 }
158 Semaphore limit = hostSemaphores.get(host);
159 if (limit == null) {
160 synchronized (hostSemaphores) {
161 limit = hostSemaphores.get(host);
162 if (limit == null) {
163 limit = new Semaphore(hostLimit);
164 hostSemaphores.put(host, limit);
165 }
166 }
167 }
168 return limit;
169 }
170
171 private void acquireSemaphore(Runnable job) throws InterruptedException {
172 if (job instanceof JCSCachedTileLoaderJob) {
173 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
174 getSemaphore(jcsJob).acquire();
175 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
176 }
177 }
178
179 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
180 boolean ret = true;
181 Semaphore limit = getSemaphore(job);
182 if (limit != null) {
183 ret = limit.tryAcquire();
184 if (ret) {
185 job.setFinishedTask(() -> releaseSemaphore(job));
186 }
187 }
188 return ret;
189 }
190
191 private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
192 boolean ret = true;
193 if (job instanceof JCSCachedTileLoaderJob) {
194 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
195 Semaphore limit = getSemaphore(jcsJob);
196 if (limit != null) {
197 ret = limit.tryAcquire(timeout, unit);
198 if (ret) {
199 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
200 }
201 }
202 }
203 return ret;
204 }
205
206 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
207 Semaphore limit = getSemaphore(job);
208 if (limit != null) {
209 limit.release();
210 if (limit.availablePermits() > hostLimit) {
211 Main.warn("More permits than it should be");
212 }
213 }
214 }
215}
Note: See TracBrowser for help on using the repository browser.