source: josm/trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java@ 8510

Last change on this file since 8510 was 8510, checked in by Don-vip, 9 years ago

checkstyle: enable relevant whitespace checks and fix them

File size: 5.0 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.util.Iterator;
5import java.util.Map;
6import java.util.concurrent.ConcurrentHashMap;
7import java.util.concurrent.LinkedBlockingDeque;
8import java.util.concurrent.Semaphore;
9import java.util.concurrent.TimeUnit;
10
11import org.openstreetmap.josm.Main;
12
13/**
14 * @author Wiktor Niesiobędzki
15 *
16 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
17 * and it will set a runnable task with semaphore release, when job has finished.
18 *
19 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
20 * guarantee that all threads will be busy, when there is work for them[2].
21 *
22 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
23 * tasks do not go through the Queue
24 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
25 * take the first available job and wait for semaphore. It might be the case, that semaphore was released
26 * for some task further in queue, but this implementation doesn't try to detect such situation
27 *
28 *
29 */
30
31public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
32 private static final long serialVersionUID = 1L;
33
34 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
35 private final int hostLimit;
36
37 /**
38 * Creates an unbounded queue
39 * @param hostLimit how many parallel calls to host to allow
40 */
41 public HostLimitQueue(int hostLimit) {
42 super(); // create unbounded queue
43 this.hostLimit = hostLimit;
44 }
45
46 private JCSCachedTileLoaderJob<?, ?> findJob() {
47 for (Iterator<Runnable> it = iterator(); it.hasNext();) {
48 Runnable r = it.next();
49 if (r instanceof JCSCachedTileLoaderJob) {
50 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
51 if (tryAcquireSemaphore(job)) {
52 if (remove(job)) {
53 return job;
54 } else {
55 // we have acquired the semaphore, but we didn't manage to remove it, as someone else did
56 // release the semaphore and look for another candidate
57 releaseSemaphore(job);
58 }
59 } else {
60 Main.info("TMS - Skipping job {0} because host limit reached", job.getUrl());
61 }
62 }
63 }
64 return null;
65 }
66
67 @Override
68 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
69 Runnable job = findJob();
70 if (job != null) {
71 return job;
72 }
73 job = pollFirst(timeout, unit);
74 if (job != null) {
75 acquireSemaphore(job);
76 }
77 return job;
78 }
79
80 @Override
81 public Runnable take() throws InterruptedException {
82 Runnable job = findJob();
83 if (job != null) {
84 return job;
85 }
86 job = takeFirst();
87 if (job != null) {
88 acquireSemaphore(job);
89 }
90 return job;
91 }
92
93 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
94 String host = job.getUrl().getHost();
95 Semaphore limit = hostSemaphores.get(host);
96 if (limit == null) {
97 synchronized (hostSemaphores) {
98 limit = hostSemaphores.get(host);
99 if (limit == null) {
100 limit = new Semaphore(hostLimit);
101 hostSemaphores.put(host, limit);
102 }
103 }
104 }
105 return limit;
106 }
107
108 private void acquireSemaphore(Runnable job) throws InterruptedException {
109 if (job instanceof JCSCachedTileLoaderJob) {
110 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
111 Semaphore limit = getSemaphore(jcsJob);
112 if (limit != null) {
113 limit.acquire();
114 jcsJob.setFinishedTask(new Runnable() {
115 @Override
116 public void run() {
117 releaseSemaphore(jcsJob);
118 }
119 });
120 }
121 }
122 }
123
124 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
125 boolean ret = true;
126 Semaphore limit = getSemaphore(job);
127 if (limit != null) {
128 ret = limit.tryAcquire();
129 if (ret) {
130 job.setFinishedTask(new Runnable() {
131 @Override
132 public void run() {
133 releaseSemaphore(job);
134 }
135 });
136 }
137 }
138 return ret;
139 }
140
141 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
142 Semaphore limit = getSemaphore(job);
143 if (limit != null) {
144 limit.release();
145 if (limit.availablePermits() > hostLimit) {
146 Main.warn("More permits than it should be");
147 }
148 }
149 }
150}
Note: See TracBrowser for help on using the repository browser.