source: josm/trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java@ 11546

Last change on this file since 11546 was 11546, checked in by Don-vip, 7 years ago

sonar - squid:S899 - Return values should not be ignored when they contain the operation status code

  • Property svn:eol-style set to native
File size: 7.6 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.io.IOException;
5import java.net.URL;
6import java.util.Iterator;
7import java.util.Map;
8import java.util.concurrent.ConcurrentHashMap;
9import java.util.concurrent.LinkedBlockingDeque;
10import java.util.concurrent.Semaphore;
11import java.util.concurrent.ThreadPoolExecutor;
12import java.util.concurrent.TimeUnit;
13
14import org.openstreetmap.josm.Main;
15
16/**
17 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
18 * and it will set a runnable task with semaphore release, when job has finished.
19 * <p>
20 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
21 * guarantee that all threads will be busy, when there is work for them[2]. <br>
22 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
23 * tasks do not go through the Queue <br>
24 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
25 * take the first available job and wait for semaphore. It might be the case, that semaphore was released
26 * for some task further in queue, but this implementation doesn't try to detect such situation
27 *
28 * @author Wiktor Niesiobędzki
29 */
30public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
31 private static final long serialVersionUID = 1L;
32
33 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
34 private final int hostLimit;
35
36 private ThreadPoolExecutor executor;
37
38 private int corePoolSize;
39
40 private int maximumPoolSize;
41
42 /**
43 * Creates an unbounded queue
44 * @param hostLimit how many parallel calls to host to allow
45 */
46 public HostLimitQueue(int hostLimit) {
47 super(); // create unbounded queue
48 this.hostLimit = hostLimit;
49 }
50
51 private JCSCachedTileLoaderJob<?, ?> findJob() {
52 for (Iterator<Runnable> it = iterator(); it.hasNext();) {
53 Runnable r = it.next();
54 if (r instanceof JCSCachedTileLoaderJob) {
55 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
56 if (tryAcquireSemaphore(job)) {
57 if (remove(job)) {
58 return job;
59 } else {
60 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
61 // release the semaphore and look for another candidate
62 releaseSemaphore(job);
63 }
64 } else {
65 URL url = null;
66 try {
67 url = job.getUrl();
68 } catch (IOException e) {
69 Main.debug(e);
70 }
71 Main.debug("TMS - Skipping job {0} because host limit reached", url);
72 }
73 }
74 }
75 return null;
76 }
77
78 @Override
79 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
80 Runnable job = findJob();
81 if (job != null) {
82 return job;
83 }
84 job = pollFirst(timeout, unit);
85 if (job != null) {
86 try {
87 boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
88 return gotLock ? job : null;
89 } catch (InterruptedException e) {
90 // acquire my got interrupted, first offer back what was taken
91 if (!offer(job)) {
92 Main.warn("Unable to offer back " + job);
93 }
94 throw e;
95 }
96 }
97 return job;
98 }
99
100 @Override
101 public Runnable take() throws InterruptedException {
102 Runnable job = findJob();
103 if (job != null) {
104 return job;
105 }
106 job = takeFirst();
107 try {
108 acquireSemaphore(job);
109 } catch (InterruptedException e) {
110 // acquire my got interrupted, first offer back what was taken
111 if (!offer(job)) {
112 Main.warn("Unable to offer back " + job);
113 }
114 throw e;
115 }
116 return job;
117 }
118
119 /**
120 * Set the executor for which this queue works. It's needed to spawn new threads.
121 * See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
122 *
123 * @param executor executor for which this queue works
124 */
125 public void setExecutor(ThreadPoolExecutor executor) {
126 this.executor = executor;
127 this.maximumPoolSize = executor.getMaximumPoolSize();
128 this.corePoolSize = executor.getCorePoolSize();
129 }
130
131 @Override
132 public boolean offer(Runnable e) {
133 if (!super.offer(e)) {
134 return false;
135 }
136
137 if (executor != null) {
138 // See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
139 // force spawn of a thread if not reached maximum
140 int currentPoolSize = executor.getPoolSize();
141 if (currentPoolSize < maximumPoolSize
142 && currentPoolSize >= corePoolSize) {
143 executor.setCorePoolSize(currentPoolSize + 1);
144 executor.setCorePoolSize(corePoolSize);
145 }
146 }
147 return true;
148 }
149
150 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
151 String host;
152 try {
153 host = job.getUrl().getHost();
154 } catch (IOException e) {
155 // do not pass me illegal URL's
156 throw new IllegalArgumentException(e);
157 }
158 Semaphore limit = hostSemaphores.get(host);
159 if (limit == null) {
160 synchronized (hostSemaphores) {
161 limit = hostSemaphores.get(host);
162 if (limit == null) {
163 limit = new Semaphore(hostLimit);
164 hostSemaphores.put(host, limit);
165 }
166 }
167 }
168 return limit;
169 }
170
171 private void acquireSemaphore(Runnable job) throws InterruptedException {
172 if (job instanceof JCSCachedTileLoaderJob) {
173 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
174 getSemaphore(jcsJob).acquire();
175 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
176 }
177 }
178
179 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
180 boolean ret = true;
181 Semaphore limit = getSemaphore(job);
182 if (limit != null) {
183 ret = limit.tryAcquire();
184 if (ret) {
185 job.setFinishedTask(() -> releaseSemaphore(job));
186 }
187 }
188 return ret;
189 }
190
191 private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
192 boolean ret = true;
193 if (job instanceof JCSCachedTileLoaderJob) {
194 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
195 Semaphore limit = getSemaphore(jcsJob);
196 if (limit != null) {
197 ret = limit.tryAcquire(timeout, unit);
198 if (ret) {
199 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
200 }
201 }
202 }
203 return ret;
204 }
205
206 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
207 Semaphore limit = getSemaphore(job);
208 if (limit != null) {
209 limit.release();
210 if (limit.availablePermits() > hostLimit) {
211 Main.warn("More permits than it should be");
212 }
213 }
214 }
215}
Note: See TracBrowser for help on using the repository browser.