Memory-limited Java 8 parallelStream forEachOrdered

How can we process a stream in parallel and in order?  Sadly there’s no way to guarantee order without waiting for each process to finish before starting the next one.  So what can we do when the first part of the process is time consuming and can be done in parallel, and there’s a second part which is quick, but needs to be done in order?

Furthermore, what if using all available threads would cause you to run out of memory sometimes?  How can you process as much as possible as fast as possible, but still keep things in order for the second part?

Below we describe a method for achieving this, but it is not 100% foolproof, and should be avoided if at all possible. This method is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

Before we describe our method, I’d like to suggest some alternatives.

Alternative #1: parallelStream forEach

Find a way to handle the unordered output.  Use an approximation if you have to.

Alternative #2: forEachOrdered

Forgo the parallelization.

Alternative #3: Array

Use an array to store the intermediate results. Do not use a thread-unsafe growing collection like ArrayList. This only works if all your intermediate results can fit in memory.  This requires you to have an integer index with your input.  If you need to maintain your own index, you’ll also need to maintain your own Executor.  Something like this:

int counter = 0;
final IntermediateResult[] intermediateResults = new IntermediateResult[sizeOverestimate(iterableInput)];
for(Element element : iterableInput) {
  final int index = counter++;
  executor.submit(() -> {
    intermediateResults[index] = processPart1(element);
  });
}
executor.shutdown();
executor.awaitTermination();
for(IntermediateResult intermediateResult : intermediateResults) {
  processPart2(intermediateResult);
}
What if I have memory limitations?

Then use a batched version of the array method, where you run as much of part 1 as you can, then run part 2, then part 1 again, part 2, …

What if I need to run part2 while part1 is running?

You can use a Semaphore to keep track of how much memory is being used/freed. It gets a little more complicated though to keep part 2 running in order, though.

This is overly complicated and possibly has bugs. It definitely may deadlock. Use at your own risk.

    /**
     * @param part1      Time-consuming parallelizable part.  Should never return null.
     * @param part2      Fast sequential part.  Should be called in order.
     * @param permits    Number of permits you can fit in memory.  One permit could be for one intermediate, or could be for
     *                   one permit for each byte of memory an intermediate needs (e.g. file contents).
     * @param getPermits For when you have dynamic-sized Intermediates. (e.g. file contents).  If unknown, over-estimate so
     *                   you don't run out of memory.
     */
    public static <Input, Intermediate> void forEachOrdered(final List<Input> inputs,
                                                            final Function<Input, Intermediate> part1,
                                                            final Consumer<Intermediate> part2,
                                                            final int permits,
                                                            final ToIntFunction<Input> getPermits) {
        // This keeps track of how much memory is available for the intermediate results, so we don't start part1(input) if we
        // won't have enough memory for the Intermediate result.  It has to be Fair, meaning it doesn't allow a
        // less-memory-dependent intermediate to start before a more-memory-dependent intermediate
        final Semaphore semaphore = new Semaphore(permits, true);
        // This keeps track of the number of permits needed for each Input.  To save us from calling getPermits repeatedly.
        final List<Integer> ps = new ArrayList<>();
        // This keeps track of the submitted part1's and their order
        final List<Future<Intermediate>> futures = submitPart1(inputs, part1, getPermits, semaphore, ps);

        // part1 should now be running in parallel, and everything should be set to run part2 in order
        try {
            for (int i = 0, inputsSize = inputs.size(); i < inputsSize; i++) {
                // each Future with index i should correspond to running Input i and returning Intermediate i
                // also clear it to free memory
                final Future<Intermediate> future       = futures.set(i, null);
                final Intermediate         intermediate = future.get();
                // run part2!
                part2.accept(intermediate);
                // release the permits so more part1's can run
                semaphore.release(ps.get(i));
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Submits part1 to run in parallel, but only as many as are permitted can run simultaneously.
     * Also saves in the number of permits per input.
     *
     * @param part1      Time-consuming parallelizable part.  Should never return null.
     * @param getPermits For when you have dynamic-sized Intermediates. (e.g. file contents).  If unknown, over-estimate so you
     *                   don't run out of memory.
     * @param semaphore  This keeps track of how much memory is available for the intermediate results, so we don't start
     *                   part1(input) if we won't have enough memory for the Intermediate result.  It has to be Fair, meaning it
     *                   doesn't allow a less-memory-dependent intermediate to start before a more-memory-dependent intermediate
     * @param ps         This keeps track of the number of permits needed for each Input.  To save us from calling getPermits
     *                   repeatedly.
     * @return This keeps track of the submitted part1's and their order
     */
    @Nonnull
    private static <Input, Intermediate> List<Future<Intermediate>> submitPart1(final List<Input> inputs,
                                                                                final Function<Input, Intermediate> part1,
                                                                                final ToIntFunction<Input> getPermits,
                                                                                final Semaphore semaphore,
                                                                                final List<Integer> ps) {
        // This keeps track of the submitted part1's and their order
        final List<Future<Intermediate>> futures = new ArrayList<>();
        // We need to manage our own thread pool.  Use all available processors.
        final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        for (final Input input : inputs) {
            final int p = getPermits.applyAsInt(input);
            ps.add(p);
            futures.add(executor.submit(() -> {
                // This is not guaranteed to be called in input order, but it works for me.
                // If it's too much out-of-order, we could fail to acquire the lock for an earlier input and be stalled.
                semaphore.acquire(p);
                return part1.apply(input);
            }));
        }
        executor.shutdown();
        return futures;
    }
What if I have a part 3 which can be parallel like part 1?

As if the above code wasn’t complicated enough? Let’s call part 3 “part 2b”, and the sequential part “part 2a”. You still need part 2b to finish so you can release memory, I get it. So what we need to do is collect as much of part 1 as we can and send it in a batch to part 2, then part 2 can run part 2a sequentially and submit part 2b to a pool of threads:


    /**
     * For when batching part2 is good.
     *
     * @param part1        Time-consuming parallelizable part.  Should never return null.
     * @param part1Default Default intermediate, in case part1 throws an exception.
     * @param part2        Fast sequential part.  Should be called in order.
     * @param permits      Number of permits you can fit in memory.  One permit could be for one intermediate, or could be for
     *                     one permit for each byte of memory an intermediate needs (e.g. file contents).
     * @param getPermits   For when you have dynamic-sized Intermediates. (e.g. file contents).  If unknown, over-estimate so
     *                     you don't run out of memory.
     */
    public static <Input, Intermediate> void forEachOrdered(final List<Input> inputs,
                                                            final Function<Input, Intermediate> part1,
                                                            @NotNull final Intermediate part1Default,
                                                            final Consumer<List<Intermediate>> part2,
                                                            final int permits,
                                                            final ToIntFunction<Input> getPermits) {
        // This keeps track of how much memory is available for the intermediate results, so we don't start part1(input) if we
        // won't have enough memory for the Intermediate result.  It has to be Fair, meaning it doesn't allow a
        // less-memory-dependent intermediate to start before a more-memory-dependent intermediate
        final Semaphore semaphore = new Semaphore(permits, true);
        // This keeps track of the number of permits needed for each Input.  To save us from calling getPermits repeatedly.
        final List<Integer> ps = new ArrayList<>();
        // This keeps track of the submitted part1's and their order
        final List<Future<Intermediate>> futures = submitPart1(inputs, part1, getPermits, semaphore, ps);

        // part1 should now be running in parallel, and everything should be set to run part2 in order
        try {
            // each Future with index i should correspond to running Input i and returning Intermediate i
            for (int i = 0, inputsSize = inputs.size(); i < inputsSize; i++) {
                // wait until this one is done
                futures.get(i).get();
                // find the next one which is not yet done
                int f = i + 1;
                while (f < inputsSize && futures.get(f).isDone()) {
                    f++;
                }
                // run part2!
                part2.accept(IntStream.range(i, f).mapToObj(future -> {
                    try {
                        // each Future with index i should correspond to running Input i and returning Intermediate i
                        // also clear it to free memory
                        return futures.set(future, null).get();
                    } catch (InterruptedException | ExecutionException e) {
                        return part1Default;
                    }
                }).collect(Collectors.toList()));
                // release the permits so more part1's can run
                semaphore.release(ps.subList(i, f).parallelStream().mapToInt(Integer::intValue).sum());
                // skip the done ones
                i = f - 1;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
What if I need to guarantee no deadlock?

In that case, you need to guarantee that the semaphore’s permits are acquired in order. You can probably do this if you can guarantee that only one thread is trying to acquire the permits and also guarantee that the thread trying to is the earliest one you queued. Something like this:

    /**
     * Submits part1 to run in parallel, but only as many as are permitted can run simultaneously.
     * Also saves in the number of permits per input.
     *
     * @param part1      Time-consuming parallelizable part.  Should never return null.
     * @param getPermits For when you have dynamic-sized Intermediates. (e.g. file contents).  If unknown, over-estimate so you
     *                   don't run out of memory.
     * @param semaphore  This keeps track of how much memory is available for the intermediate results, so we don't start
     *                   part1(input) if we won't have enough memory for the Intermediate result.  It has to be Fair, meaning it
     *                   doesn't allow a less-memory-dependent intermediate to start before a more-memory-dependent intermediate
     * @param ps         This keeps track of the number of permits needed for each Input.  To save us from calling getPermits
     *                   repeatedly.
     * @return This keeps track of the submitted part1's and their order
     */
    @Nonnull
    private static <Input, Intermediate> List<Future<Intermediate>> submitPart1(final List<Input> inputs,
                                                                                          final Function<Input, Intermediate> part1,
                                                                                          final ToIntFunction<Input> getPermits,
                                                                                          final Semaphore semaphore,
                                                                                          final List<Integer> ps) {
        // This keeps track of the submitted part1's and their order
        final List<Future<Intermediate>> futures = new ArrayList<>();
        // We need to manage our own thread pool.  Use all available processors.
        final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        // used to keep track of which one should be allowed to start running
        final AtomicInteger   current  = new AtomicInteger(0);
        // used to make sure the current one is the one that is permitted to run
        final Object          lock     = new Object();
        for (int i = 0, inputsSize = inputs.size(); i < inputsSize; i++) {
            final int   finalI = i;
            final Input input  = inputs.get(i);
            final int   p      = getPermits.applyAsInt(input);
            ps.add(p);
            futures.add(executor.submit(() -> {
                // keep checking if we are the current one
                boolean isCurrent;
                do {
                    // make sure we are the only one checking, so if we are permitted to run, we get the permits
                    synchronized (lock) {
                        // are we permitted to run?
                        isCurrent = current.compareAndSet(finalI, finalI + 1);
                        if (isCurrent)
                            // get the permits
                            semaphore.acquire(p);
                    }
                } while (!isCurrent);
                // run part1!
                return part1.apply(input);
            }));
        }
        executor.shutdown();
        return futures;
    }
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s