Skip to content

observeOn.concatMap may not invoke the mapper function on the desired thread #6447

Closed
@akarnokd

Description

@akarnokd

There are two effects in play here: fusion and trampolining.

Fusion will take the observeOn queue and just pull on it when the subscription happens. Trampolining will use the last interacting thread (the subscription thread or the termination thread) to pull on the internal queue (dedicated or fused) and when there is an item, it will run the mapper on that thread.

Flowable.range(1, 5)
    .observeOn(Schedulers.computation())
    .concatMap(v -> Flowable.just(Thread.currentThread().toString()))
    .blockingSubscribe(System.out::println);

This will likely print computation, main, main, main, main.

Workarounds:

  • Use hide() between observeOn and concatMap to break fusion.
  • Use subscribeOn after concatMap to move the trampoline off the main thread.
  • Use defer+subscribeOn in the mapper function to calculate the actual Flowable on a desired thread, not the mapper thread.

Appeared on StackOverflow.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions