RxJava

RxJava

Библиотека для асинхронного программирования на Java, основанная на концепциях функционального программирования и реактивного программирования. Основная цель RxJava — облегчить работу с асинхронными потоками данных и событиями.

• Cold observable. Создаются по запросу и выдают данные при подписке на них. Пример: retrofit.

• Hot observable. Всегда активны и выдают данные независимо от того подписаны на них или нет. Пример: обработка жестов view.

Threading Operators
subscribeOn

Определяет поток, на котором будет выполнено создание Observable и запуск его операций. Влияет на операторы выше. Если указан только subscribeOn - все операторы будут выполняться в указанном потоке.

Observable.just("item")
    .subscribeOn(Schedulers.io()) // Выполнение Observable на IO-потоке
    .observeOn(AndroidSchedulers.mainThread()) // Обработка результатов на основном потоке
    .subscribe(
        item -> System.out.println("Received: " + item),
        Throwable::printStackTrace
    );
observeOn

Определяет поток, на котором будет производиться обработка элементов и уведомление об их изменении. Влияет на операторы ниже. Если указан только observeOn - все операторы будут выполняться в текущем потоке, а операторы, расположенные ниже observeOn будут переключаться на поток, указанный в observeOn.

Observable.just("item")
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation()) // Обработка результатов на вычислительном потоке
    .map(item -> item.toUpperCase())
    .observeOn(AndroidSchedulers.mainThread()) // Обработка окончательных результатов на основном потоке
    .subscribe(
        item -> System.out.println("Processed: " + item),
        Throwable::printStackTrace
    );
doOnNext

Действие с каждым элементом. Выполняет указанное действие для каждого элемента, который проходит через Observable, перед тем как он будет передан дальше.

observable.doOnNext { item -> println("Item: $item") }
doOnComplete

Действие при завершении потока. Выполняет указанное действие, когда Observable завершает эмитирование всех элементов. Не вызывается, если поток был прерван с ошибкой.

observable.doOnComplete { println("Stream completed") }
doOnError

Действие при ошибке. Выполняет указанное действие, когда возникает ошибка в процессе работы Observable.

observable.doOnError { error -> println("Error occurred: $error") }
doOnSubscribe

Действие при подписке. Выполняет указанное действие, когда подписка на Observable начинается.

observable.doOnSubscribe { disposable -> println("Subscribed: $disposable") }
doAfterTerminate

Действие после завершения работы Observable. Выполняет указанное действие после завершения работы Observable, независимо от того, завершился ли поток успешно или с ошибкой.

observable.doAfterTerminate { println("Stream terminated") }
doOnDispose

Действие при отписке. Выполняет указанное действие, когда Disposable объект используется для отписки от Observable.

observable.doOnDispose { println("Disposed") }
Transformation Operators

Функции, которые позволяют трансформировать, фильтровать и управлять потоками данных, поступающими из Observable.

just

Создает Observable, который излучает один элемент.

Observable.just("Hello, World!")
    .subscribe(System.out::println);
fromIterable

Создает Observable из коллекции.

List<String> list = Arrays.asList("One", "Two", "Three");
Observable.fromIterable(list)
    .subscribe(System.out::println);
map

Применяет функцию к каждому элементу.

Observable.just(1, 2, 3)
    .map(i -> i * 2)
    .subscribe(System.out::println);  // Output: 2, 4, 6
flatMap

Преобразует каждый элемент в Observable и объединяет результаты. Порядок генерированных элементов не гарантируется.

Observable.just(1, 2, 3)
    .flatMap(i -> Observable.just(i, i * 2))
    .subscribe(System.out::println);  // Output: 1, 2, 2, 4, 3, 6
concatMap

Преобразует каждый элемент в Observable и объединяет результаты, сохраняя порядок.

Observable.just(1, 2, 3)
    .concatMap(i -> Observable.just(i, i * 2))
    .subscribe(System.out::println);  // Output: 1, 2, 2, 4, 3, 6
switchMap

Переключается на новый Observable каждый раз, когда приходит новый элемент, отменяя предыдущие Observable.

Observable<String> searchRequests = Observable.just("query1", "query2", "query3");

searchRequests
    .switchMap(query -> performSearch(query))
    .subscribe(
        result -> System.out.println("Result: " + result),
        Throwable::printStackTrace,
        () -> System.out.println("Completed")
    );

Observable<String> performSearch(String query) {
    return Observable.just("Results for " + query)
                     .delay(1, TimeUnit.SECONDS);
}
filter

Пропускает элементы, которые не удовлетворяют условию.

Observable.just(1, 2, 3, 4, 5)
    .filter(i -> i % 2 == 0)
    .subscribe(System.out::println);  // Output: 2, 4
skip

Пропускает первые N элементов в Observable и затем передает оставшиеся элементы.

Observable<Integer> source = Observable.range(1, 5);
source.skip(2) // Пропускает первые 2 элемента
    .subscribe(System.out::println); // Output: 3, 4, 5
skipLast

Этот оператор пропускает последние N элементов в Observable и передает все остальные.

Observable<Integer> source = Observable.range(1, 5);
source.skipLast(2) // Пропускает последние 2 элемента
    .subscribe(System.out::println); // Output: 1, 2, 3
take

Берет только первые count элементов.

Observable.just(1, 2, 3, 4, 5)
    .take(3)
    .subscribe(System.out::println);  // Output: 1, 2, 3
takeLast

Берет последние count элементов.

Observable.just(1, 2, 3, 4, 5)
    .takeLast(3)
    .subscribe(System.out::println);  // Output: 3, 4, 5
merge

Объединяет несколько Observable в один.

Observable.merge(
    Observable.just(1, 2),
    Observable.just(3, 4)
).subscribe(System.out::println);  // Output: 1, 2, 3, 4
concat

Объединяет несколько Observable последовательно, сохраняя порядок элементов.

Observable.concat(
    Observable.just(1, 2),
    Observable.just(3, 4)
).subscribe(System.out::println);  // Output: 1, 2, 3, 4
zip

Объединяет элементы из нескольких Observable в пары.

Observable.zip(
    Observable.just(1, 2),
    Observable.just("A", "B"),
    (i, s) -> i + s
).subscribe(System.out::println);  // Output: 1A, 2B
combineLatest

Комбинирует последние значения из нескольких Observable при каждом обновлении любого из них.

Observable.combineLatest(
    Observable.just(1, 2).startWith(0),
    Observable.just("A", "B"),
    (i, s) -> i + s
).subscribe(System.out::println);  // Output: 0A, 1A, 1B, 2B
join

Объединяет элементы из двух Observable на основе определенного условия.

Observable<String> source1 = Observable.just("A", "B", "C");
Observable<Integer> source2 = Observable.just(1, 2, 3);

source1.join(
    source2,
    item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
    item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
    (s, i) -> s + i // Комбинирование элементов
).subscribe(System.out::println);
onErrorResumeNext

Возвращает новый Observable при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .onErrorResumeNext(Observable.just(10, 20))
    .subscribe(System.out::println);  // Output: 1, 10, 20
onErrorReturnItem

Возвращает заданный элемент при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .onErrorReturnItem(-1)
    .subscribe(System.out::println);  // Output: 1, -1
retry

Повторяет попытку при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .retry(1)
    .subscribe(System.out::println);  // Output: 1, 2, 3
retryWhen

Позволяет управлять повторными попытками при возникновении ошибки.

Observable.just(1, 2, 3)
    .map(i -> {
        if (i == 2) throw new RuntimeException("Error!");
        return i;
    })
    .retryWhen(errors -> errors.take(1).flatMap(error -> Observable.just(4)))
    .subscribe(System.out::println);  // Output: 1, 2, 4, 3
delay

Задерживает эмиссию элементов.

Observable.just(1, 2, 3)
    .delay(1, TimeUnit.SECONDS)
    .subscribe(System.out::println);  // Output: 1, 2, 3 (через 1 секунду)
debounce

Эмитирует элементы, которые не были заменены другим элементом в течение заданного времени.

Observable.create(emitter -> {
    emitter.onNext("A");
    Thread.sleep(100);
    emitter.onNext("B");
    Thread.sleep(100);
    emitter.onNext("C");
    emitter.onComplete();
}).debounce(150, TimeUnit.MILLISECONDS)
  .subscribe(System.out::println);  // Output: C
scan

Применяет функцию накопления к каждому элементу и предыдущему результату.

Observable.just(1, 2, 3)
    .scan(0, (total, next) -> total + next)
    .subscribe(System.out::println);  // Output: 0, 1, 3, 6
distinct

Удаляет дублирующиеся элементы.

Observable.just(1, 1, 2, 2, 3, 3)
    .distinct()
    .subscribe(System.out::println);  // Output: 1, 2, 3
share

Преобразует холодный источник данных в горячий, позволяя нескольким подписчикам совместно использовать одну и ту же подписку на исходный поток данных.

val observable = Observable.just(1, 2, 3)
val sharedObservable = observable.share()

sharedObservable.subscribe { println("Subscriber 1: $it") }
sharedObservable.subscribe { println("Subscriber 2: $it") }
timer

Cоздает Observable, который через указанное время эмитит один элемент и завершает выполнение.

Observable.timer(5, TimeUnit.SECONDS)
    .subscribe { println("Completed after 5 seconds") }
timeout

Используется для указания максимального времени ожидания между элементами потока. Если за это время не поступает новый элемент, поток выбрасывает ошибку TimeoutException.

Observable
    .just(1, 2, 3)
    .delay(2, TimeUnit.SECONDS) // задержка
    .timeout(1, TimeUnit.SECONDS) // если больше 1 секунды ожидания, выбросит ошибку
    .subscribe(
        { item -> println("Received: $item") },
        { error -> println("Error: $error") }
    )
Streams
Observable

Источник данных, который испускает события или элементы данных и позволяет наблюдателям (Observers) подписываться на них. Это основной компонент для работы с реактивным программированием.

Observable.just("Hello", "RxJava")
    .map { it.uppercase() }
    .filter { it.startsWith("H") }
    .subscribe(
        { println("Received: $it") },
        { println("Error: ${it.message}") },
        { println("Completed!") }
    )
Single

Тип Observable, который испускает только одно значение или ошибку. Используется, когда нужно вернуть одно значение, например, результат сетевого запроса.

Single.just("Hello, RxJava")
    .map { it.uppercase() }
    .subscribe(
        { println("Success: $it") },
        { println("Error: ${it.message}") }
    )
Completable

Тип Observable, который не испускает данные, а лишь сигнализирует о завершении (onComplete) или ошибке (onError). Используется для выполнения задач без возвращаемого результата, например, операций записи или удаления.

Completable.fromAction {
    println("Executing some action...")
}
    .subscribe(
        { println("Action completed!") },
        { error -> println("Error: ${error.message}") }
    )
Maybe

Источник, который может либо вернуть одно значение, либо завершиться без результата, либо сообщить об ошибке. Используется для операций, где результат может быть необязательным, например, получение данных из кэша.

Maybe.just("Hello, Maybe")
    .filter { it.length > 5 }
    .subscribe(
        { println("Success: $it") },
        { println("Error: ${it.message}") },
        { println("No data emitted") }
    )
Schedulers

Набор планировщиков, которые управляют выполнением потоков для асинхронной обработки данных. Они определяют, где и как будет исполняться работа, например, в фоновом, основном или вычислительном потоке.

Schedulers.io

Предоставляет пул потоков для задач ввода-вывода, таких как сетевые запросы, чтение файлов или работу с базой данных. Пул автоматически масштабируется под нагрузку.

fun main() {
    Observable.just("File 1", "File 2")
        .subscribeOn(Schedulers.io())
        .subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
}
Schedulers.computation

Предоставляет пул потоков, оптимизированный для вычислительных задач, таких как обработка данных или сложные расчёты. Количество потоков равно числу процессоров устройства.

fun main() {
    Observable.range(1, 5)
        .subscribeOn(Schedulers.computation())
        .map { it * it } // Вычисление квадрата
        .subscribe { println("Result: $it on ${Thread.currentThread().name}") }
}
Schedulers.single

Выполняет все задачи на одном общем потоке, предназначенном для последовательного выполнения. Это полезно для операций, где важна строгая последовательность выполнения.

fun main() {
    Observable.just("Task 1", "Task 2")
        .subscribeOn(Schedulers.single())
        .subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
}
Schedulers.trampoline

Запускает задачи в текущем потоке последовательно, помещая их в очередь. Новая задача выполняется только после завершения предыдущей. Это полезно для тестирования или строгой последовательности задач в одном потоке.

Observable.just("Task 1", "Task 2")
    .subscribeOn(Schedulers.trampoline())
    .subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
Schedulers.newThread

Создаёт новый поток для каждой задачи. Используется, когда требуется изолированный поток выполнения, например, для независимых операций.

fun main() {
    Observable.just("Task 1", "Task 2")
        .subscribeOn(Schedulers.newThread())
        .subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
}
Schedulers.from(Executor)

Позволяет создавать кастомный планировщик (Scheduler) на основе переданного Executor. Это полезно, когда нужно контролировать управление потоками, например, задавать пул потоков.

fun main() {
    val executor = Executors.newFixedThreadPool(2)
    val scheduler = Schedulers.from(executor)

    Observable.just("Task 1", "Task 2")
        .subscribeOn(scheduler)
        .subscribe { println("Processing: $it on ${Thread.currentThread().name}") }

    executor.shutdown()
}
AndroidSchedulers.mainThread

Возвращает Scheduler для выполнения операций в главном (UI) потоке Android. Используется, когда нужно обновить UI после выполнения асинхронных задач.

Observable.just("Hello, RxJava")
    .subscribeOn(Schedulers.io()) // Асинхронная задача выполняется в фоновом потоке
    .observeOn(AndroidSchedulers.mainThread()) // Результат возвращается в UI-поток
    .subscribe { value ->
        textView.text = value // Обновляем UI
    }
Disposable

Механизм управления ресурсами, позволяющий отменять подписки на потоки данных и освобождать ресурсы.

fun main() {
    val observable = Observable.just("Hello", "World")

    val disposable: Disposable = observable.subscribe(
        { item -> println(item) }, // onNext
        { error -> println("Error: ${error.message}") }, // onError
        { println("Completed") } // onComplete
    )

    // Отмена подписки
    disposable.dispose()

    // Проверка, была ли подписка отменена
    println("Disposed: ${disposable.isDisposed}")
}
dispose

Отменяет подписку и освобождает ресурсы.

disposable.dispose()
isDisposed

Проверяет, была ли подписка отменена.

val isDisposed = disposable.isDisposed()
CompositeDisposable

Позволяет управлять несколькими объектами Disposable одновременно. Он упрощает управление подписками, особенно когда у вас есть несколько потоков данных, на которые вы подписаны, и вы хотите отменить их все в одном месте.

fun main() {
    // Создание CompositeDisposable для управления подписками
    val compositeDisposable = CompositeDisposable()

    // Создание Observable
    val observable1 = Observable.just("Hello")
    val observable2 = Observable.just("World")

    // Подписка на первый Observable
    val disposable1: Disposable = observable1.subscribe(
        { item -> println(item) },
        { error -> println("Error: ${error.message}") },
        { println("Completed first observable") }
    )

    // Подписка на второй Observable
    val disposable2: Disposable = observable2.subscribe(
        { item -> println(item) },
        { error -> println("Error: ${error.message}") },
        { println("Completed second observable") }
    )

    // Добавление Disposables в CompositeDisposable
    compositeDisposable.addAll(disposable1, disposable2)

    // Отмена всех подписок
    compositeDisposable.dispose()

    // Проверка, были ли подписки отменены
    println("All disposed: ${compositeDisposable.size() == 0}")
}
Flowable

Предназначенный для работы с большими потоками данных или с данными, которые поступают с высокой частотой. Flowable решает проблему перегрузки потока с помощью механизма backpressure, позволяющего регулировать скорость отправки данных и обработки.

Flowable.range(1, 10)
    .map { it * 2 }
    .subscribe(
        { value -> println("Получено: $value") },
        { error -> println("Ошибка: $error") }
    )
BackpressureStrategy.MISSING

Cтратегия по умолчанию, которая не применяет никакой политики к backpressure. Если источник данных генерирует события слишком быстро, возникает ошибка MissingBackpressureException.

Flowable.create<Int>({ emitter ->
    for (i in 1..1000) {
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.MISSING)
    .observeOn(Schedulers.io())
    .subscribe(
        { value -> println("Получено: $value") },
        { error -> println("Ошибка: $error") } // Вероятно, будет MissingBackpressureException
    )
BackpressureStrategy.BUFFER

Эта стратегия сохраняет все данные в буфере. Подходит, если данных не слишком много. Однако, если данных слишком много, может привести к переполнению памяти.

Flowable.create<Int>({ emitter ->
    for (i in 1..1000) {
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.BUFFER)
    .observeOn(Schedulers.io())
    .subscribe(
        { value -> println("Получено: $value") },
        { error -> println("Ошибка: $error") }
    )
BackpressureStrategy.DROP

Пропускает новые данные, если потребитель не успевает их обрабатывать. Полезно для обработки событий, где важно только последнее значение, а не весь поток.

Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop()
    .observeOn(Schedulers.io())
    .subscribe(
        { value -> println("Получено: $value") },
        { error -> println("Ошибка: $error") }
    )
BackpressureStrategy.LATEST

Сохраняет только последнее значение, отбрасывая остальные. Подходит, если важно всегда иметь доступ к самому последнему значению.

Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureLatest()
    .observeOn(Schedulers.io())
    .subscribe(
        { value -> println("Получено: $value") },
        { error -> println("Ошибка: $error") }
    )
BackpressureStrategy.ERROR

Выбрасывает ошибку MissingBackpressureException, если данных слишком много. Используется, когда не нужно обрабатывать данные при перегрузке.

Flowable.create<Int>({ emitter ->
    for (i in 1..1000) {
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.ERROR)
    .observeOn(Schedulers.io())
    .subscribe(
        { value -> println("Получено: $value") },
        { error -> println("Ошибка: $error") } // Будет MissingBackpressureException
    )
Subjects

Subject – это абстрактный класс в RxJava, одновременно расширяющий класс Observable и реализующий интерфейс Observer. Subject – это hot observable. Виды: Publish, Replay, Behavior, Async, Unicast.

PublishSubject

Работает как и Observable, и Observer одновременно. Он начинает отправлять данные подписчикам только после их подписки, игнорируя все предыдущие события.

• Не сохраняет предыдущие данные. Новые подписчики получают только последующие события.

• Завершает поток с помощью onComplete().

• Может отправить ошибку через onError(). Если onError() был вызван, подписчики больше не будут получать данные.

val subject = PublishSubject.create<String>()

// Подписчик 1
subject.subscribe { value ->
    println("Подписчик 1: $value")
}

subject.onNext("Первое событие") // Подписчик 1 получит "Первое событие"

// Подписчик 2 подписывается после первого события
subject.subscribe { value ->
    println("Подписчик 2: $value")
}

subject.onNext("Второе событие") 
// Оба подписчика получат "Второе событие"

ReplaySubjectПодписчики получают все данные, отправленные до подписки и все новые данные с момента подписки (студент опоздал на лекцию, но получает ее содержимое с самого начала). Если ReplaySubject создается фабричным методом createWithSize(size: Int), то подписчики будут получать только заданное количество элементов, отправленных в прошлом.
BehaviorSubjectПодписчики получат последний элемент данных, разосланный до подписки и все новые данные с момента подписки.
AsyncSubjectПодписчики получат только самый последний элемент данных, который был отправлен перед вызовом onComplete (студент пришел на пару и получил только домашку).
UnicastSubjectРаботает также как ReplaySubject но может иметь только одного подписчика. Все последующие подписчики получают onError с IllegalStateException.
Вопросы на собесе (36)
  • Streams (3)
    1. Какие основные стримы существуют в RxJava?

      Observable Single Completable Maybe

    1. Что такое Observable и как он работает?

      Observable представляет поток данных, который эмитирует значения и позволяет подписаться на них для обработки. Подписка запускает поток данных и обрабатывает его изменения.

    1. В чем разница между Observable и Single?

      Observable может эмитировать несколько значений или событий, тогда как Single эмитирует только одно значение или ошибку. Single предназначен для сценариев, где ожидается лишь одно результатное значение, например, при выполнении асинхронной операции.

  • Subjects (2)
    1. Что такое Subject и чем оно отличается от Observable?

      Subject — это объект, который позволяет как подписываться на события, так и эмитировать их. В отличие от Observable, который только эмитирует данные, Subject может действовать как источник и получатель данных.

    1. Какой тип Subject хранит только последний элемент и передает его новым подписчикам?

      PublishSubject

      BehaviorSubject

      ReplaySubject

      AsyncSubject

  • Operators (15)
    1. Как объединить несколько Observable потоков в один?

      Используйте операторы merge, concat, или zip.

    1. Как обрабатывать ошибки в RxJava?

      Через onError, retry и onErrorResumeNext.

    1. Как работает оператор zip в RxJava?

      Оператор zip объединяет несколько источников, комбинируя их элементы поочередно, пока все источники не завершатся. Он использует функцию для объединения соответствующих элементов и передает результат в новый Observable.

    1. Как сделать параллельные запросы в RxJava?

      Использовать операторы zip() или merge().

    1. Разница между операторами map, flatMap, concatMap и switchMap в RxJava?

      map преобразует каждое значение, эмитированное Observable, в другое значение, возвращая одиночный элемент на каждое входное значение.

      flatMap преобразует каждое значение в новый Observable и объединяет результаты асинхронно, что может привести к перемешиванию последовательности элементов.

      concatMap аналогичен flatMap, но сохраняет порядок, обрабатывая каждый Observable поочерёдно, не смешивая результаты.

      switchMap преобразует значение в новый Observable, отменяя предыдущий Observable, если приходит новое значение, что полезно для обработки только последнего запроса.

    1. Есть 2 источника Observable, первый испускает элементы A, второй - элементы Б. В случае применения switchMap какой будет результат?

      При использовании switchMap результатом будет только последовательность элементов из второго Observable, испускающего элементы Б. Это происходит потому, что switchMap отписывается от предыдущего Observable (первого, который испускает A), как только второй Observable (Б) начинает испускать значения. Таким образом, элементы A будут проигнорированы, и результатом будут только элементы Б.

    1. Есть 2 источника Observable, первый испускает элементы A, второй - элементы Б. В случае применения concatMap какой будет результат?

      При использовании concatMap результатом будет последовательность, содержащая сначала все элементы A из первого Observable, а затем все элементы Б из второго Observable, так как concatMap обрабатывает каждый источник поочерёдно, сохраняя порядок.

    1. Какой оператор используется для объединения данных из двух Observable на основе условия?

      join

      combineLatest

      merge

      zip

    1. Какой оператор используется для объединения нескольких Observable, при этом дожидаясь завершения всех источников?

      zip

      combineLatest

      concat

      merge

    1. Какой оператор используется для объединения нескольких источников данных, при этом элементы могут излучаться вперемешку?

      concat

      zip

      combineLatest

      merge

    1. Какой оператор используется для выполнения побочных эффектов при подписке на Observable?

      doOnNext

      doOnComplete

      doOnSubscribe

      doOnError

    1. Какой оператор используется для выполнения действия после окончания работы Observer?

      doAfterComplete

      doOnNext

      doOnComplete

      doOnTerminate

    1. Какой оператор используется для пропуска первых N элементов, излучаемых Observable?

      ignore

      filter

      take

      skip

    1. Какой из перечисленных источников данных RxJava либо успешно завершает свою работу без какого-либо возвращаемого значения, либо выбрасывает исключение?

      Observable

      Flowable

      Maybe

      Completable

    1. Какой оператор используется для обработки ошибок в RxJava?

      onErrorResumeNext

      onError

      handleError

      catchError

  • Disposable (2)
    1. Что такое Disposable и как управлять жизненным циклом Observable?

      Disposable используется для управления жизненным циклом Observable и отмены подписки, освобождая ресурсы. Для этого сохраняйте Disposable и вызывайте dispose() при необходимости.

    1. Какой из следующих классов позволяет управлять подписками в RxJava?

      Disposable

      Observer

      Publisher

      Subscriber

  • Другие (14)
    1. Что такое backpressure и как его обрабатывать в RxJava?

      Backpressure возникает, когда поток данных поступает быстрее, чем может быть обработан. Обрабатывать его можно с помощью операторов, таких как onBackpressureBuffer, onBackpressureDrop или onBackpressureLatest, которые управляют тем, как данные накапливаются или сбрасываются, когда система перегружена.

    1. Почему возникает UndeliverableException?

      Возникает, когда исключение не может быть доставлено подписчику, обычно из-за завершения подписки или отсутствия обработчика ошибок. Оно может возникнуть, если ошибка происходит после завершения подписки или если ошибка не обрабатывается в методе onError. Для предотвращения можно настроить глобальный обработчик ошибок с помощью RxJavaPlugins.setErrorHandler.

    1. Что такое Flowable?

      Flowable — это один из типов источников данных, предназначенный для работы с асинхронными потоками данных. Он поддерживает обратный поток (backpressure), что позволяет контролировать скорость, с которой данные отправляются подписчикам, и предотвращает переполнение буфера при обработке больших объемов данных.

    1. Какие источники в RxJava являются холодными и горячими?

      Холодные источники (например, Observable, Flowable, Single, Maybe) начинают испускать данные только при подписке на них. Каждая подписка получает свой собственный набор данных, что позволяет избежать дублирования.

      Горячие источники (например, Subject, BehaviorSubject, ReplaySubject) испускают данные независимо от подписчиков. Подписчики могут пропустить данные, если они подписываются после того, как данные уже были отправлены.

    1. Как превратить холодный источник данных в горячий в RxJava?

      Использовать Subject.

      val subject = PublishSubject.create<DataType>()
      coldSource.subscribe(subject)

      Использовать оператор share().

      val hotObservable = coldSource.share()
    1. На каком Shedulers будет выполнен оператор filter() в данной rx-цепочке?
      Observable.just("1")
          .subscribe0n(Schedulers.io())
          .map {}
          .observe0n(Schedulers.trampoline())
          .filter {}
          .observe0n(AndroidSchedulers.mainThread())
          .subscribe {}

      На Schedulers.trampoline(), так как observeOn(Schedulers.trampoline()) меняет поток для всех последующих операторов до следующего вызова observeOn().

    1. Что выведет println?
      fun main() {
      		val orderIds: List<Long> = listOf(1, 2, 3)
      		loadAllOrders(orderIds)
      				.subscribel { orders ->
      						val result = orderIds = orders.map(Order::id)
      						println(result)
      				})
      }
      
      private fun loadAllOrders(orderIds: List<Long>): Single<List<Order>>{
      		return Observable.fromIterable(orderIds)
      				.subscribeOn(Schedulers.io))
      				.flatMapSingle(::loadOrderById)
      				.toList()
      }

      flatMap не гарантирует порядок выдаваемых значений, поэтому результат может быть и true и false. Чтобы всегда было true, flatMap нужно поменять на concatMap. Он будет работать дольше. На больших данных заметно дольше.

      Решение: после метода toList добавить сортировку.

    1. На каком потоке вызывается код в doOnSubscribe?
      fun updateRegions() {
      		getRegions()
      				.subscribeOn(Schedulers.single())
      				.observen(Schedulers.computation())
      				.doOnSubscribe {
      						showProgress()
      				}
      				.subscribeOn(Schedulers.io))
      				.subscribe { regions ->
      						showRegions(regions)
      				}
      }

      Правильный ответ - io. Поток для doOnSubscribe задается первый subscribeOn после него.

    1. На каком потоке вызывается код в flatMap?
      fun updateRegions() {
      		getRegions()
      				.subscribeOn(Schedulers.single())
      				.observeOn(Schedulers.computation())
      				.flatMap {
      						getCity()
      				}
      				.subscribeOn(Schedulers.io))
      				.subscribe(
      						{ regions ->
      								showRegions (regions)
      						}
      				)
      }

      Код внутри flatMap вызывается на том потоке, который был установлен последним перед ним, то есть на Schedulers.computation из вызова observeOn.

    1. На каком потоке вызывается код в onSuccess?
      fun updateRegions() {
      		getRegions()
      				.observen(Schedulers.computation())
      				.observeOn(Schedulers.io))
      				.subscribe(
      						{ regions ->
      								showRegions(regions)
      						}
      				)
      }

      Метод onSuccess в subscribe вызывается на потоке, который был установлен последним вызовом observeOn. В данном случае, это Schedulers.io, поэтому код внутри лямбды subscribe (включая вызов showRegions(regions)) выполнится на IO-потоке.

    1. На каком потоке вызывается код в onSuccess?
      fun updateRegions() {
      		getRegions()
      				.subscribeOn(Schedulers.computation())
      				.subscribeOn(Schedulers.io())
      				.subscribe(
      					 { regions ->
      								showRegions (regions)
      						}
      }

      Правильный ответ - computation. Будет отрабатывать тот subscribeOn который находится ближе всего к источнику - остальные subscribeOn будут игнорироваться. Не имеет значения в каком месте ветки указан subscribeOn.

    1. На каком потоке вызывается код в onSuccess?
      fun updateRegions() {
      		getRegions ()
      				.subscribeOn(Schedulers.io))
      				.subscribe(
      						{ regions ->
      								showRegions(regions)
      						}
      }

      Поток, на котором будет вызван метод onSuccess, зависит от использования операторов subscribeOn и observeOn. Здесь subscribeOn(Schedulers.io) указывает, что работа с источником данных getRegions будет выполняться в потоке из пула Schedulers.io. Однако, метод subscribe по умолчанию работает в том же потоке, где происходит подписка, если не используется observeOn.

    1. На каком потоке вызывается код в onSuccess?
      fun updateRegions() {
      		getRegions()
      				.subscribe(
      						{ regions ->
      								showRegions (regions)
      						}
      }

      По умолчанию onSuccess вызывается на том потоке, на котором произошло завершение работы источника данных. Если вы не указали явно subscribeOn и observeOn, то выполнение будет происходить в том же потоке, где выполняется getRegions.

    1. На каком потоке вызывается код в onSuccess?
      fun updateRegions() {
      		getRegions()
      				.subscribeOn(Schedulers.io))
      				.observeOn(Schedulers.computation())
      				.subscribe(
      						{ regions ->
      								showRegions(regions)
      						}
      				)
      }

      Правильный ответ - на computation. Поток запустится на io. subscribeOn отвечает за методы выше. observeOn отвечает за методы ниже. Результат выведется на ближайшем observeOn.