RxJava

RxJava

Библиотека для асинхронного программирования на Java, основанная на концепциях функционального программирования и реактивного программирования. Основная цель RxJava — облегчить работу с асинхронными потоками данных и событиями.

• Cold observable. Создаются по запросу и выдают данные при подписке на них. Пример: retrofit.

• Hot observable. Всегда активны и выдают данные независимо от того подписаны на них или нет. Пример: обработка жестов view.

Threading Operators

subscribeOn

Определяет поток, на котором будет выполнено создание Observable и запуск его операций. Влияет на операторы выше. Если указан только subscribeOn - все операторы будут выполняться в указанном потоке.

Observable.just("item")
    .subscribeOn(Schedulers.io()) // Выполнение Observable на IO-потоке
    .observeOn(AndroidSchedulers.mainThread()) // Обработка результатов на основном потоке
    .subscribe(
        item -> System.out.println("Received: " + item),
        Throwable::printStackTrace
    );

observeOn

Определяет поток, на котором будет производиться обработка элементов и уведомление об их изменении. Влияет на операторы ниже. Если указан только observeOn - все операторы будут выполняться в текущем потоке, а операторы, расположенные ниже observeOn будут переключаться на поток, указанный в observeOn.

Observable.just("item")
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation()) // Обработка результатов на вычислительном потоке
    .map(item -> item.toUpperCase())
    .observeOn(AndroidSchedulers.mainThread()) // Обработка окончательных результатов на основном потоке
    .subscribe(
        item -> System.out.println("Processed: " + item),
        Throwable::printStackTrace
    );

doOnNext

Действие с каждым элементом. Выполняет указанное действие для каждого элемента, который проходит через Observable, перед тем как он будет передан дальше.

observable.doOnNext { item -> println("Item: $item") }

doOnComplete

Действие при завершении потока. Выполняет указанное действие, когда Observable завершает эмитирование всех элементов. Не вызывается, если поток был прерван с ошибкой.

observable.doOnComplete { println("Stream completed") }

doOnError

Действие при ошибке. Выполняет указанное действие, когда возникает ошибка в процессе работы Observable.

observable.doOnError { error -> println("Error occurred: $error") }

doOnSubscribe

Действие при подписке. Выполняет указанное действие, когда подписка на Observable начинается.

observable.doOnSubscribe { disposable -> println("Subscribed: $disposable") }

doAfterTerminate

Действие после завершения работы Observable. Выполняет указанное действие после завершения работы Observable, независимо от того, завершился ли поток успешно или с ошибкой.

observable.doAfterTerminate { println("Stream terminated") }

doOnDispose

Действие при отписке. Выполняет указанное действие, когда Disposable объект используется для отписки от Observable.

observable.doOnDispose { println("Disposed") }
Transformation Operators

Функции, которые позволяют трансформировать, фильтровать и управлять потоками данных, поступающими из Observable.

just

Создает Observable, который излучает один элемент.

Observable.just("Hello, World!")
    .subscribe(System.out::println);

fromIterable

Создает Observable из коллекции.

List<String> list = Arrays.asList("One", "Two", "Three");
Observable.fromIterable(list)
    .subscribe(System.out::println);

map

Применяет функцию к каждому элементу.

Observable.just(1, 2, 3)
    .map(i -> i * 2)
    .subscribe(System.out::println);  // Output: 2, 4, 6

flatMap

Преобразует каждый элемент в Observable и объединяет результаты. Порядок генерированных элементов не гарантируется.

Observable.just(1, 2, 3)
    .flatMap(i -> Observable.just(i, i * 2))
    .subscribe(System.out::println);  // Output: 1, 2, 2, 4, 3, 6

concatMap

Преобразует каждый элемент в Observable и объединяет результаты, сохраняя порядок.

Observable.just(1, 2, 3)
    .concatMap(i -> Observable.just(i, i * 2))
    .subscribe(System.out::println);  // Output: 1, 2, 2, 4, 3, 6

switchMap

Переключается на новый Observable каждый раз, когда приходит новый элемент, отменяя предыдущие Observable.

Observable<String> searchRequests = Observable.just("query1", "query2", "query3");

searchRequests
    .switchMap(query -> performSearch(query))
    .subscribe(
        result -> System.out.println("Result: " + result),
        Throwable::printStackTrace,
        () -> System.out.println("Completed")
    );

Observable<String> performSearch(String query) {
    return Observable.just("Results for " + query)
                     .delay(1, TimeUnit.SECONDS);
}

filter

Пропускает элементы, которые не удовлетворяют условию.

Observable.just(1, 2, 3, 4, 5)
    .filter(i -> i % 2 == 0)
    .subscribe(System.out::println);  // Output: 2, 4

skip

Пропускает первые N элементов в Observable и затем передает оставшиеся элементы.

Observable<Integer> source = Observable.range(1, 5);
source.skip(2) // Пропускает первые 2 элемента
    .subscribe(System.out::println); // Output: 3, 4, 5

skipLast

Этот оператор пропускает последние N элементов в Observable и передает все остальные.

Observable<Integer> source = Observable.range(1, 5);
source.skipLast(2) // Пропускает последние 2 элемента
    .subscribe(System.out::println); // Output: 1, 2, 3

take

Берет только первые count элементов.

Observable.just(1, 2, 3, 4, 5)
    .take(3)
    .subscribe(System.out::println);  // Output: 1, 2, 3

takeLast

Берет последние count элементов.

Observable.just(1, 2, 3, 4, 5)
    .takeLast(3)
    .subscribe(System.out::println);  // Output: 3, 4, 5

merge

Объединяет несколько Observable в один.

Observable.merge(
    Observable.just(1, 2),
    Observable.just(3, 4)
).subscribe(System.out::println);  // Output: 1, 2, 3, 4

concat

Объединяет несколько Observable последовательно, сохраняя порядок элементов.

Observable.concat(
    Observable.just(1, 2),
    Observable.just(3, 4)
).subscribe(System.out::println);  // Output: 1, 2, 3, 4

zip

Объединяет элементы из нескольких Observable в пары.

Observable.zip(
    Observable.just(1, 2),
    Observable.just("A", "B"),
    (i, s) -> i + s
).subscribe(System.out::println);  // Output: 1A, 2B

combineLatest

Комбинирует последние значения из нескольких Observable при каждом обновлении любого из них.

Observable.combineLatest(
    Observable.just(1, 2).startWith(0),
    Observable.just("A", "B"),
    (i, s) -> i + s
).subscribe(System.out::println);  // Output: 0A, 1A, 1B, 2B

join

Объединяет элементы из двух Observable на основе определенного условия.

Observable<String> source1 = Observable.just("A", "B", "C");
Observable<Integer> source2 = Observable.just(1, 2, 3);

source1.join(
    source2,
    item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
    item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
    (s, i) -> s + i // Комбинирование элементов
).subscribe(System.out::println);

onErrorResumeNext

Возвращает новый Observable при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .onErrorResumeNext(Observable.just(10, 20))
    .subscribe(System.out::println);  // Output: 1, 10, 20

onErrorReturnItem

Возвращает заданный элемент при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .onErrorReturnItem(-1)
    .subscribe(System.out::println);  // Output: 1, -1

retry

Повторяет попытку при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .retry(1)
    .subscribe(System.out::println);  // Output: 1, 2, 3

retryWhen

Позволяет управлять повторными попытками при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .retryWhen(errors -> errors.take(1).flatMap(error -> Observable.just(4)))
    .subscribe(System.out::println);  // Output: 1, 2, 4, 3

delay

Задерживает эмиссию элементов.

Observable.just(1, 2, 3)
    .delay(1, TimeUnit.SECONDS)
    .subscribe(System.out::println);  // Output: 1, 2, 3 (через 1 секунду)

debounce

Эмитирует элементы, которые не были заменены другим элементом в течение заданного времени.

Observable.create(emitter -> {
    emitter.onNext("A");
    Thread.sleep(100);
    emitter.onNext("B");
    Thread.sleep(100);
    emitter.onNext("C");
    emitter.onComplete();
}).debounce(150, TimeUnit.MILLISECONDS)
  .subscribe(System.out::println);  // Output: C

scan

Применяет функцию накопления к каждому элементу и предыдущему результату.

Observable.just(1, 2, 3)
    .scan(0, (total, next) -> total + next)
    .subscribe(System.out::println);  // Output: 0, 1, 3, 6

distinct

Удаляет дублирующиеся элементы.

Observable.just(1, 1, 2, 2, 3, 3)
    .distinct()
    .subscribe(System.out::println);  // Output: 1, 2, 3
Streams

Observable

Представляет стрим объектов. Подписчики на Observable имеют коллбэки onNext(value), onComplete(), onError(throwable). onNext() может не вызываться, или вызываться произвольное количество раз. При завершении стрима вызывается onComplete() или onError().

Single

Отправляет объект, который принимается в коллбэке onSuccess(value), или бросает исключение в коллбэк onError(throwable) в случае ошибки.

Completable

Не возвращает никакого значения. На подписчиках вызывается onComplete() при удачном завершении или onError(throwable) в случае ошибки.

Maybe

Может отработать как Single или как Completable. На подписчиках вызывается один из трех коллбэков: onSuccess(value), onComplete() без какого-либо значения, или onError(throwable). Каждый из коллбэков может быть вызван один раз или не вызван вообще.

Schedulers

Потоки в RxJava выполняются с помощью планировщиков. Scheduler управляет одним или несколькими потоками. Когда планировщику необходимо выполнить задачу он берет поток из своего пула и запускает в нем задачу.

Schedulers.io

Поддерживается неограниченным пулом потоков. Используется для операций ввода-вывода, не требующих интенсивного использования ЦП (взаимодействие с файловой системой, сетевые вызовы, базы данных).

Schedulers.computation

Поддерживается ограниченным пулом потоков, размер которого не больше количества доступных процессоров (Runtime.availableProcessors). Использовать для вычислительных и ресурсоемких работ (изменение размера изображений, обработка больших наборов данных). Не делать таких потоков больше, чем доступно ядер, иначе производительность будет снижаться из-за переключения контекста и накладных расходов на создание потоков.

Schedulers.newThread

Создает новый поток для каждой запланированной задачи. Это планировщик стоит дорого, так как не происходит повторного переиспользования потока.

Schedulers.from(Executor)

Создает настраиваемый планировщик, поддерживаемый указанным executor. Если задача запланирована, когда все потоки заняты - будет поставлена в очередь.

Scheduler.from(Executors.newFixedThreadPool(n)) // ограничить количество одновременных потоков в пуле

Schedulers.single

Поддерживается одним потоком, выполняет задачи последовательно в указанном порядке.

Schedulers.trampoline

Выполняет задачи в порядке FIFO (first in first out) одним из рабочих потоков. Используется при реализации рекурсии, чтобы не увеличивался стек вызовов.

AndroidSchedulers.mainThread

Основной поток - место, где происходит взаимодействие с пользователем.

Disposable

Disposable

Интерфейс с методами dispose() и isDisposed(). Работае как оболочка над другими операторами.

CompositeDisposable

Объединяет несколько disposable, которые могут быть удалены одновременно.

Flowable

Flowable

Работает как Observable, но поддерживает backpressure по умолчанию.

Backpressure

Ситуация, когда observable выдает элементы быстрее, чем оператор или подписчик может использовать. Observable имеет бесконечный буфер, в который будут добавляться элементы до тех пор, пока не случится OutOfMemoryError. BackpressureStrategy – это enum, который задает стратегию обработки backpressure.

BackpressureStrategy.MISSINGстратегия не установлена
BackpressureStrategy.ERRORв случае backpressure бросает исключение MissingBackpressureException
BackpressureStrategy.BUFFERбуфер будет расширяться каждый раз, когда ему потребуется больше памяти. Его первоначальный размер составляет 128 элементов
BackpressureStrategy.DROPесли потребитель занят обработкой данных, эта стратегия буквально отбрасывает все значения, которые не могут быть обработаны
BackpressureStrategy.LATESTтакже отбрасывает неиспользуемые значения, но кэширует последнее. Его можно сравнить с БУФЕРОМ с постоянным размером в 1 элемент. Благодаря этому, когда потребитель запрашивает новое значение, он всегда получает самое последнее
Flowable.create<Int>({ emitter: FlowableEmitter<Int> ->
    repeat(1_000) { count -> emitter.onNext(count) }
}, BackpressureStrategy.BUFFER)
    .subscribe { println(it) }
Subjects

Subject – это абстрактный класс в RxJava, одновременно расширяющий класс Observable и реализующий интерфейс Observer. Subject – это hot observable. Виды: Publish, Replay, Behavior, Async, Unicast.

PublishSubjectПодписчики получают новые данные с момента подписки. PublishSubject не кэширует и не рассылает прошлые элементы (студент слушает лекцию с того момента, как вошел в аудиторию).
ReplaySubjectПодписчики получают все данные, отправленные до подписки и все новые данные с момента подписки (студент опоздал на лекцию, но получает ее содержимое с самого начала). Если ReplaySubject создается фабричным методом createWithSize(size: Int), то подписчики будут получать только заданное количество элементов, отправленных в прошлом.
BehaviorSubjectПодписчики получат последний элемент данных, разосланный до подписки и все новые данные с момента подписки.
AsyncSubjectПодписчики получат только самый последний элемент данных, который был отправлен перед вызовом onComplete (студент пришел на пару и получил только домашку).
UnicastSubjectРаботает также как ReplaySubject но может иметь только одного подписчика. Все последующие подписчики получают onError с IllegalStateException.
RxJava. Вопросы на собесе
  1. Какие основные операторы существуют в RxJava?
  1. Что такое Observable и как оно работает?
  1. В чем разница между Observable и Single?
  1. Что такое Subject и чем оно отличается от Observable?
  1. Как объединить несколько Observable потоков в один?
  1. Как обрабатывать ошибки в RxJava?
  1. Что такое backpressure и как его обрабатывать в RxJava?
  1. Что такое Disposable и как управлять жизненным циклом Observable?
  1. Как сделать параллельные запросы в RxJava?

    Использовать операторы zip() или merge().

  1. Почему возникает UndeliverableException?

    Возникает, когда исключение не может быть доставлено подписчику, обычно из-за завершения подписки или отсутствия обработчика ошибок. Оно может возникнуть, если ошибка происходит после завершения подписки или если ошибка не обрабатывается в методе onError. Для предотвращения можно настроить глобальный обработчик ошибок с помощью RxJavaPlugins.setErrorHandler.

  1. Какой оператор используется для объединения данных из двух Observable на основе условия?

    join

    combineLatest

    merge

    zip

  1. Какой тип Subject хранит только последний элемент и передает его новым подписчикам?

    PublishSubject

    BehaviorSubject

    ReplaySubject

    AsyncSubject

  1. Какой оператор используется для объединения нескольких Observable, при этом дожидаясь завершения всех источников?

    zip

    combineLatest

    concat

    merge

  1. Какой из следующих классов позволяет управлять подписками в RxJava?

    Disposable

    Observer

    Publisher

    Subscriber

  1. Какой оператор используется для выполнения действия после окончания работы Observer?

    doAfterComplete

    doOnNext

    doOnComplete

    doOnTerminate

  1. Какой оператор используется для пропуска первых N элементов, излучаемых Observable?

    ignore

    filter

    take

    skip