source: josm/trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java@ 10420

Last change on this file since 10420 was 10420, checked in by Don-vip, 8 years ago

sonar - squid:S1166 - Exception handlers should preserve the original exceptions

  • Property svn:eol-style set to native
File size: 5.5 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.io.IOException;
5import java.net.URL;
6import java.util.Iterator;
7import java.util.Map;
8import java.util.concurrent.ConcurrentHashMap;
9import java.util.concurrent.LinkedBlockingDeque;
10import java.util.concurrent.Semaphore;
11import java.util.concurrent.TimeUnit;
12
13import org.openstreetmap.josm.Main;
14
15/**
16 * @author Wiktor Niesiobędzki
17 *
18 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
19 * and it will set a runnable task with semaphore release, when job has finished.
20 *
21 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
22 * guarantee that all threads will be busy, when there is work for them[2].
23 *
24 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
25 * tasks do not go through the Queue
26 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
27 * take the first available job and wait for semaphore. It might be the case, that semaphore was released
28 * for some task further in queue, but this implementation doesn't try to detect such situation
29 *
30 *
31 */
32public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
33 private static final long serialVersionUID = 1L;
34
35 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
36 private final int hostLimit;
37
38 /**
39 * Creates an unbounded queue
40 * @param hostLimit how many parallel calls to host to allow
41 */
42 public HostLimitQueue(int hostLimit) {
43 super(); // create unbounded queue
44 this.hostLimit = hostLimit;
45 }
46
47 private JCSCachedTileLoaderJob<?, ?> findJob() {
48 for (Iterator<Runnable> it = iterator(); it.hasNext();) {
49 Runnable r = it.next();
50 if (r instanceof JCSCachedTileLoaderJob) {
51 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
52 if (tryAcquireSemaphore(job)) {
53 if (remove(job)) {
54 return job;
55 } else {
56 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
57 // release the semaphore and look for another candidate
58 releaseSemaphore(job);
59 }
60 } else {
61 URL url = null;
62 try {
63 url = job.getUrl();
64 } catch (IOException e) {
65 Main.debug(e);
66 }
67 Main.debug("TMS - Skipping job {0} because host limit reached", url);
68 }
69 }
70 }
71 return null;
72 }
73
74 @Override
75 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
76 Runnable job = findJob();
77 if (job != null) {
78 return job;
79 }
80 job = pollFirst(timeout, unit);
81 if (job != null) {
82 acquireSemaphore(job);
83 }
84 return job;
85 }
86
87 @Override
88 public Runnable take() throws InterruptedException {
89 Runnable job = findJob();
90 if (job != null) {
91 return job;
92 }
93 job = takeFirst();
94 if (job != null) {
95 acquireSemaphore(job);
96 }
97 return job;
98 }
99
100 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
101 String host;
102 try {
103 host = job.getUrl().getHost();
104 } catch (IOException e) {
105 // do not pass me illegal URL's
106 throw new IllegalArgumentException(e);
107 }
108 Semaphore limit = hostSemaphores.get(host);
109 if (limit == null) {
110 synchronized (hostSemaphores) {
111 limit = hostSemaphores.get(host);
112 if (limit == null) {
113 limit = new Semaphore(hostLimit);
114 hostSemaphores.put(host, limit);
115 }
116 }
117 }
118 return limit;
119 }
120
121 private void acquireSemaphore(Runnable job) throws InterruptedException {
122 if (job instanceof JCSCachedTileLoaderJob) {
123 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
124 Semaphore limit = getSemaphore(jcsJob);
125 if (limit != null) {
126 limit.acquire();
127 jcsJob.setFinishedTask(new Runnable() {
128 @Override
129 public void run() {
130 releaseSemaphore(jcsJob);
131 }
132 });
133 }
134 }
135 }
136
137 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
138 boolean ret = true;
139 Semaphore limit = getSemaphore(job);
140 if (limit != null) {
141 ret = limit.tryAcquire();
142 if (ret) {
143 job.setFinishedTask(new Runnable() {
144 @Override
145 public void run() {
146 releaseSemaphore(job);
147 }
148 });
149 }
150 }
151 return ret;
152 }
153
154 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
155 Semaphore limit = getSemaphore(job);
156 if (limit != null) {
157 limit.release();
158 if (limit.availablePermits() > hostLimit) {
159 Main.warn("More permits than it should be");
160 }
161 }
162 }
163}
Note: See TracBrowser for help on using the repository browser.