Changeset 18821 in josm


Ignore:
Timestamp:
2023-08-24T17:23:11+02:00 (11 months ago)
Author:
taylor.smock
Message:

Fix #23140: RejectedExecutionException when MultiFetchServerObjectReader is cancelled while creating download jobs

This was caused by a race condition.
User starts a download, and cancels it as jobs are submitted to the thread pool.
The next job to be submitted will cause a RejectedExecutionException. In order
to fix this, we ensure that we shutdown the thread pool inside a synchronized
block, and add jobs to the executor inside a synchronized block.

Location:
trunk
Files:
2 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/org/openstreetmap/josm/io/MultiFetchServerObjectReader.java

    r18129 r18821  
    4848 * Retrieves a set of {@link OsmPrimitive}s from an OSM server using the so called
    4949 * Multi Fetch API.
    50  *
     50 * <p>
    5151 * Usage:
    5252 * <pre>
     
    128128     * Remembers an {@link OsmPrimitive}'s id. The id will
    129129     * later be fetched as part of a Multi Get request.
    130      *
     130     * <p>
    131131     * Ignore the id if it represents a new primitives.
    132132     *
     
    326326        CompletionService<FetchResult> ecs = new ExecutorCompletionService<>(exec);
    327327        List<Future<FetchResult>> jobs = new ArrayList<>();
    328         while (!toFetch.isEmpty() && !isCanceled()) {
    329             jobs.add(ecs.submit(new Fetcher(type, extractIdPackage(toFetch), progressMonitor)));
     328        // There exists a race condition where this is cancelled after isCanceled is called, such that
     329        // the exec ThreadPool has been shut down. This can cause a RejectedExecutionException.
     330        synchronized (this) {
     331            while (!toFetch.isEmpty() && !isCanceled()) {
     332                jobs.add(ecs.submit(new Fetcher(type, extractIdPackage(toFetch), progressMonitor)));
     333            }
    330334        }
    331335        // Run the fetchers
     
    348352                }
    349353            } catch (InterruptedException | ExecutionException e) {
     354                if (e instanceof InterruptedException) {
     355                    Thread.currentThread().interrupt();
     356                }
    350357                Logging.error(e);
    351358                if (e.getCause() instanceof OsmTransferException)
     
    369376     * the latest version of the primitive (if any), even if the primitive is not visible (i.e. if
    370377     * visible==false).
    371      *
     378     * <p>
    372379     * Invoke {@link #getMissingPrimitives()} to get a list of primitives which have not been
    373380     * found on  the server (the server response code was 404)
     
    589596                    }
    590597                    if (pkg.size() == 1) {
    591                         FetchResult res = new FetchResult(new DataSet(), new HashSet<PrimitiveId>());
     598                        FetchResult res = new FetchResult(new DataSet(), new HashSet<>());
    592599                        res.missingPrimitives.add(new SimplePrimitiveId(pkg.iterator().next(), type));
    593600                        return res;
     
    668675         * invokes a sequence of Multi Gets for individual ids in a set of ids and a given {@link OsmPrimitiveType}.
    669676         * The retrieved primitives are merged to {@link #outputDataSet}.
    670          *
     677         * <p>
    671678         * This method is used if one of the ids in pkg doesn't exist (the server replies with return code 404).
    672679         * If the set is fetched with this method it is possible to find out which of the ids doesn't exist.
     
    682689        protected FetchResult singleGetIdPackage(OsmPrimitiveType type, Set<Long> pkg, ProgressMonitor progressMonitor)
    683690                throws OsmTransferException {
    684             FetchResult result = new FetchResult(new DataSet(), new HashSet<PrimitiveId>());
     691            FetchResult result = new FetchResult(new DataSet(), new HashSet<>());
    685692            String baseUrl = OsmApi.getOsmApi().getBaseUrl();
    686693            for (long id : pkg) {
     
    713720    public void cancel() {
    714721        super.cancel();
    715         if (exec != null)
    716             exec.shutdownNow();
     722        // Synchronized to avoid a RejectedExecutionException in fetchPrimitives
     723        // We don't want to synchronize on the super.cancel() call.
     724        synchronized (this) {
     725            if (exec != null) {
     726                exec.shutdownNow();
     727            }
     728        }
    717729    }
    718730}
  • trunk/test/functional/org/openstreetmap/josm/io/MultiFetchServerObjectReaderTest.java

    r18691 r18821  
    55import static org.junit.jupiter.api.Assertions.assertFalse;
    66import static org.junit.jupiter.api.Assertions.assertNotNull;
     7import static org.junit.jupiter.api.Assertions.assertNull;
    78import static org.junit.jupiter.api.Assertions.assertTrue;
    8 import static org.junit.jupiter.api.Assumptions.assumeTrue;
    99
    1010import java.io.File;
     
    2323import java.util.Random;
    2424import java.util.TreeSet;
     25import java.util.concurrent.RejectedExecutionException;
    2526import java.util.concurrent.TimeUnit;
     27import java.util.concurrent.atomic.AtomicBoolean;
     28import java.util.concurrent.atomic.AtomicInteger;
     29import java.util.concurrent.atomic.AtomicReference;
    2630import java.util.logging.Logger;
    2731
     32import org.awaitility.Awaitility;
     33import org.awaitility.Durations;
     34import org.awaitility.core.ConditionTimeoutException;
     35import org.hamcrest.Matchers;
    2836import org.junit.jupiter.api.BeforeAll;
    2937import org.junit.jupiter.api.BeforeEach;
    3038import org.junit.jupiter.api.Test;
    3139import org.junit.jupiter.api.Timeout;
    32 import org.junit.jupiter.api.extension.RegisterExtension;
    33 import org.openstreetmap.josm.JOSMFixture;
    34 import org.openstreetmap.josm.TestUtils;
    3540import org.openstreetmap.josm.data.coor.LatLon;
    3641import org.openstreetmap.josm.data.osm.Changeset;
     
    4348import org.openstreetmap.josm.data.osm.Way;
    4449import org.openstreetmap.josm.gui.progress.NullProgressMonitor;
    45 import org.openstreetmap.josm.spi.preferences.Config;
    46 import org.openstreetmap.josm.testutils.JOSMTestRules;
     50import org.openstreetmap.josm.gui.util.GuiHelper;
     51import org.openstreetmap.josm.testutils.annotations.TestUser;
     52import org.openstreetmap.josm.tools.Logging;
    4753
    4854import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
     
    5359@SuppressFBWarnings(value = "CRLF_INJECTION_LOGS")
    5460@Timeout(value = 1, unit = TimeUnit.MINUTES)
     61@org.openstreetmap.josm.testutils.annotations.OsmApi(org.openstreetmap.josm.testutils.annotations.OsmApi.APIType.DEV)
     62@TestUser
    5563class MultiFetchServerObjectReaderTest {
    5664    private static final Logger logger = Logger.getLogger(MultiFetchServerObjectReader.class.getName());
    57 
    58     /**
    59      * Setup test.
    60      */
    61     @RegisterExtension
    62     @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
    63     public JOSMTestRules test = new JOSMTestRules().preferences();
    6465
    6566    /**
     
    159160    @BeforeAll
    160161    public static void init() throws Exception {
    161         if (!TestUtils.areCredentialsProvided()) {
    162             logger.severe("OSM DEV API credentials not provided. Please define them with -Dosm.username and -Dosm.password");
    163             return;
    164         }
    165162        logger.info("initializing ...");
    166         JOSMFixture.createFunctionalTestFixture().init();
    167 
    168         Config.getPref().put("osm-server.auth-method", "basic");
    169 
    170         // don't use atomic upload, the test API server can't cope with large diff uploads
    171         Config.getPref().putBoolean("osm-server.atomic-upload", false);
    172163
    173164        File dataSetCacheOutputFile = new File(System.getProperty("java.io.tmpdir"),
     
    213204    @BeforeEach
    214205    public void setUp() throws IOException, IllegalDataException, FileNotFoundException {
    215         if (!TestUtils.areCredentialsProvided()) {
    216             return;
    217         }
    218206        File f = new File(System.getProperty("java.io.tmpdir"), MultiFetchServerObjectReaderTest.class.getName() + ".dataset");
    219207        logger.info(MessageFormat.format("reading cached dataset ''{0}''", f.toString()));
     
    230218    @Test
    231219    void testMultiGet10Nodes() throws OsmTransferException {
    232         assumeTrue(TestUtils.areCredentialsProvided());
    233220        MultiFetchServerObjectReader reader = new MultiFetchServerObjectReader();
    234221        ArrayList<Node> nodes = new ArrayList<>(ds.getNodes());
     
    252239    @Test
    253240    void testMultiGet10Ways() throws OsmTransferException {
    254         assumeTrue(TestUtils.areCredentialsProvided());
    255241        MultiFetchServerObjectReader reader = new MultiFetchServerObjectReader();
    256242        ArrayList<Way> ways = new ArrayList<>(ds.getWays());
     
    275261    @Test
    276262    void testMultiGet10Relations() throws OsmTransferException {
    277         assumeTrue(TestUtils.areCredentialsProvided());
    278263        MultiFetchServerObjectReader reader = new MultiFetchServerObjectReader();
    279264        ArrayList<Relation> relations = new ArrayList<>(ds.getRelations());
     
    298283    @Test
    299284    void testMultiGet800Nodes() throws OsmTransferException {
    300         assumeTrue(TestUtils.areCredentialsProvided());
    301285        MultiFetchServerObjectReader reader = new MultiFetchServerObjectReader();
    302286        ArrayList<Node> nodes = new ArrayList<>(ds.getNodes());
     
    320304    @Test
    321305    void testMultiGetWithNonExistingNode() throws OsmTransferException {
    322         assumeTrue(TestUtils.areCredentialsProvided());
    323306        MultiFetchServerObjectReader reader = new MultiFetchServerObjectReader();
    324307        ArrayList<Node> nodes = new ArrayList<>(ds.getNodes());
     
    349332        assertEquals("ways?ways=123,126,130", requestString);
    350333    }
     334
     335    /**
     336     * This is a non-regression test for #23140: Cancelling `MultiFetchServerObjectReader` while it is adding jobs
     337     * to the executor causes a {@link RejectedExecutionException}.
     338     * This was caused by a race condition between {@link MultiFetchServerObjectReader#cancel()} and queuing download
     339     * jobs.
     340     */
     341    @Test
     342    void testCancelDuringJobAdd() {
     343        final AtomicBoolean parsedData = new AtomicBoolean();
     344        final AtomicBoolean continueAddition = new AtomicBoolean();
     345        final AtomicInteger callCounter = new AtomicInteger();
     346        final AtomicReference<Throwable> thrownFailure = new AtomicReference<>();
     347        // We have 5 + 10 maximum (5 previous calls, 10 calls when iterating through the nodes).
     348        final int expectedCancelCalls = 5;
     349        final MultiFetchServerObjectReader reader = new MultiFetchServerObjectReader() {
     350            @Override
     351            public boolean isCanceled() {
     352                final boolean result = super.isCanceled();
     353                // There are some calls prior to the location where we are interested
     354                if (callCounter.incrementAndGet() >= expectedCancelCalls) {
     355                    // This will throw a ConditionTimeoutException.
     356                    // By blocking here until cancel() is called, we block cancel (since we are interested in a loop).
     357                    Awaitility.await().timeout(Durations.FIVE_HUNDRED_MILLISECONDS).untilTrue(continueAddition);
     358                }
     359                return result;
     360            }
     361        };
     362        ArrayList<Node> nodes = new ArrayList<>(ds.getNodes());
     363        for (int i = 0; i < 10; i++) {
     364            reader.append(nodes.get(i));
     365        }
     366        GuiHelper.runInEDT(() -> {
     367                try {
     368                    reader.parseOsm(NullProgressMonitor.INSTANCE);
     369                } catch (ConditionTimeoutException timeoutException) {
     370                    // This is expected due to the synchronization, so we just swallow it.
     371                    Logging.trace(timeoutException);
     372                } catch (Exception failure) {
     373                    thrownFailure.set(failure);
     374                } finally {
     375                    parsedData.set(true);
     376                }
     377            });
     378        // cancel, then continue
     379        Awaitility.await().untilAtomic(callCounter, Matchers.greaterThanOrEqualTo(expectedCancelCalls));
     380        reader.cancel();
     381        continueAddition.set(true);
     382        Awaitility.await().untilTrue(parsedData);
     383        if (thrownFailure.get() != null) {
     384            Logging.error(thrownFailure.get());
     385        }
     386        assertNull(thrownFailure.get());
     387        assertEquals(expectedCancelCalls, callCounter.get());
     388    }
    351389}
Note: See TracChangeset for help on using the changeset viewer.