RxJava
RxJava
Библиотека для асинхронного программирования на Java, основанная на концепциях функционального программирования и реактивного программирования. Основная цель RxJava — облегчить работу с асинхронными потоками данных и событиями.
• Cold observable. Создаются по запросу и выдают данные при подписке на них. Пример: retrofit.
• Hot observable. Всегда активны и выдают данные независимо от того подписаны на них или нет. Пример: обработка жестов view.
Threading Operators
subscribeOn
Определяет поток, на котором будет выполнено создание Observable и запуск его операций. Влияет на операторы выше. Если указан только subscribeOn - все операторы будут выполняться в указанном потоке.
Observable.just("item")
.subscribeOn(Schedulers.io()) // Выполнение Observable на IO-потоке
.observeOn(AndroidSchedulers.mainThread()) // Обработка результатов на основном потоке
.subscribe(
item -> System.out.println("Received: " + item),
Throwable::printStackTrace
);
observeOn
Определяет поток, на котором будет производиться обработка элементов и уведомление об их изменении. Влияет на операторы ниже. Если указан только observeOn - все операторы будут выполняться в текущем потоке, а операторы, расположенные ниже observeOn будут переключаться на поток, указанный в observeOn.
Observable.just("item")
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation()) // Обработка результатов на вычислительном потоке
.map(item -> item.toUpperCase())
.observeOn(AndroidSchedulers.mainThread()) // Обработка окончательных результатов на основном потоке
.subscribe(
item -> System.out.println("Processed: " + item),
Throwable::printStackTrace
);
doOnNext
Действие с каждым элементом. Выполняет указанное действие для каждого элемента, который проходит через Observable, перед тем как он будет передан дальше.
observable.doOnNext { item -> println("Item: $item") }
doOnComplete
Действие при завершении потока. Выполняет указанное действие, когда Observable завершает эмитирование всех элементов. Не вызывается, если поток был прерван с ошибкой.
observable.doOnComplete { println("Stream completed") }
doOnError
Действие при ошибке. Выполняет указанное действие, когда возникает ошибка в процессе работы Observable.
observable.doOnError { error -> println("Error occurred: $error") }
doOnSubscribe
Действие при подписке. Выполняет указанное действие, когда подписка на Observable начинается.
observable.doOnSubscribe { disposable -> println("Subscribed: $disposable") }
doAfterTerminate
Действие после завершения работы Observable. Выполняет указанное действие после завершения работы Observable, независимо от того, завершился ли поток успешно или с ошибкой.
observable.doAfterTerminate { println("Stream terminated") }
doOnDispose
Действие при отписке. Выполняет указанное действие, когда Disposable объект используется для отписки от Observable.
observable.doOnDispose { println("Disposed") }
Transformation Operators
Функции, которые позволяют трансформировать, фильтровать и управлять потоками данных, поступающими из Observable
.
just
Создает Observable, который излучает один элемент.
Observable.just("Hello, World!")
.subscribe(System.out::println);
fromIterable
Создает Observable из коллекции.
List<String> list = Arrays.asList("One", "Two", "Three");
Observable.fromIterable(list)
.subscribe(System.out::println);
map
Применяет функцию к каждому элементу.
Observable.just(1, 2, 3)
.map(i -> i * 2)
.subscribe(System.out::println); // Output: 2, 4, 6
flatMap
Преобразует каждый элемент в Observable и объединяет результаты. Порядок генерированных элементов не гарантируется.
Observable.just(1, 2, 3)
.flatMap(i -> Observable.just(i, i * 2))
.subscribe(System.out::println); // Output: 1, 2, 2, 4, 3, 6
concatMap
Преобразует каждый элемент в Observable и объединяет результаты, сохраняя порядок.
Observable.just(1, 2, 3)
.concatMap(i -> Observable.just(i, i * 2))
.subscribe(System.out::println); // Output: 1, 2, 2, 4, 3, 6
switchMap
Переключается на новый Observable каждый раз, когда приходит новый элемент, отменяя предыдущие Observable.
Observable<String> searchRequests = Observable.just("query1", "query2", "query3");
searchRequests
.switchMap(query -> performSearch(query))
.subscribe(
result -> System.out.println("Result: " + result),
Throwable::printStackTrace,
() -> System.out.println("Completed")
);
Observable<String> performSearch(String query) {
return Observable.just("Results for " + query)
.delay(1, TimeUnit.SECONDS);
}
filter
Пропускает элементы, которые не удовлетворяют условию.
Observable.just(1, 2, 3, 4, 5)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println); // Output: 2, 4
skip
Пропускает первые N элементов в Observable и затем передает оставшиеся элементы.
Observable<Integer> source = Observable.range(1, 5);
source.skip(2) // Пропускает первые 2 элемента
.subscribe(System.out::println); // Output: 3, 4, 5
skipLast
Этот оператор пропускает последние N элементов в Observable и передает все остальные.
Observable<Integer> source = Observable.range(1, 5);
source.skipLast(2) // Пропускает последние 2 элемента
.subscribe(System.out::println); // Output: 1, 2, 3
take
Берет только первые count элементов.
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(System.out::println); // Output: 1, 2, 3
takeLast
Берет последние count элементов.
Observable.just(1, 2, 3, 4, 5)
.takeLast(3)
.subscribe(System.out::println); // Output: 3, 4, 5
merge
Объединяет несколько Observable в один.
Observable.merge(
Observable.just(1, 2),
Observable.just(3, 4)
).subscribe(System.out::println); // Output: 1, 2, 3, 4
concat
Объединяет несколько Observable последовательно, сохраняя порядок элементов.
Observable.concat(
Observable.just(1, 2),
Observable.just(3, 4)
).subscribe(System.out::println); // Output: 1, 2, 3, 4
zip
Объединяет элементы из нескольких Observable в пары.
Observable.zip(
Observable.just(1, 2),
Observable.just("A", "B"),
(i, s) -> i + s
).subscribe(System.out::println); // Output: 1A, 2B
combineLatest
Комбинирует последние значения из нескольких Observable при каждом обновлении любого из них.
Observable.combineLatest(
Observable.just(1, 2).startWith(0),
Observable.just("A", "B"),
(i, s) -> i + s
).subscribe(System.out::println); // Output: 0A, 1A, 1B, 2B
join
Объединяет элементы из двух Observable на основе определенного условия.
Observable<String> source1 = Observable.just("A", "B", "C");
Observable<Integer> source2 = Observable.just(1, 2, 3);
source1.join(
source2,
item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
(s, i) -> s + i // Комбинирование элементов
).subscribe(System.out::println);
onErrorResumeNext
Возвращает новый Observable при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.onErrorResumeNext(Observable.just(10, 20))
.subscribe(System.out::println); // Output: 1, 10, 20
onErrorReturnItem
Возвращает заданный элемент при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.onErrorReturnItem(-1)
.subscribe(System.out::println); // Output: 1, -1
retry
Повторяет попытку при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.retry(1)
.subscribe(System.out::println); // Output: 1, 2, 3
retryWhen
Позволяет управлять повторными попытками при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.retryWhen(errors -> errors.take(1).flatMap(error -> Observable.just(4)))
.subscribe(System.out::println); // Output: 1, 2, 4, 3
delay
Задерживает эмиссию элементов.
Observable.just(1, 2, 3)
.delay(1, TimeUnit.SECONDS)
.subscribe(System.out::println); // Output: 1, 2, 3 (через 1 секунду)
debounce
Эмитирует элементы, которые не были заменены другим элементом в течение заданного времени.
Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(100);
emitter.onNext("B");
Thread.sleep(100);
emitter.onNext("C");
emitter.onComplete();
}).debounce(150, TimeUnit.MILLISECONDS)
.subscribe(System.out::println); // Output: C
scan
Применяет функцию накопления к каждому элементу и предыдущему результату.
Observable.just(1, 2, 3)
.scan(0, (total, next) -> total + next)
.subscribe(System.out::println); // Output: 0, 1, 3, 6
distinct
Удаляет дублирующиеся элементы.
Observable.just(1, 1, 2, 2, 3, 3)
.distinct()
.subscribe(System.out::println); // Output: 1, 2, 3
Streams
Observable
Представляет стрим объектов. Подписчики на Observable имеют коллбэки onNext(value), onComplete(), onError(throwable). onNext() может не вызываться, или вызываться произвольное количество раз. При завершении стрима вызывается onComplete() или onError().
Single
Отправляет объект, который принимается в коллбэке onSuccess(value), или бросает исключение в коллбэк onError(throwable) в случае ошибки.
Completable
Не возвращает никакого значения. На подписчиках вызывается onComplete() при удачном завершении или onError(throwable) в случае ошибки.
Maybe
Может отработать как Single или как Completable. На подписчиках вызывается один из трех коллбэков: onSuccess(value), onComplete() без какого-либо значения, или onError(throwable). Каждый из коллбэков может быть вызван один раз или не вызван вообще.
Schedulers
Потоки в RxJava выполняются с помощью планировщиков. Scheduler управляет одним или несколькими потоками. Когда планировщику необходимо выполнить задачу он берет поток из своего пула и запускает в нем задачу.
Schedulers.io
Поддерживается неограниченным пулом потоков. Используется для операций ввода-вывода, не требующих интенсивного использования ЦП (взаимодействие с файловой системой, сетевые вызовы, базы данных).
Schedulers.computation
Поддерживается ограниченным пулом потоков, размер которого не больше количества доступных процессоров (Runtime.availableProcessors). Использовать для вычислительных и ресурсоемких работ (изменение размера изображений, обработка больших наборов данных). Не делать таких потоков больше, чем доступно ядер, иначе производительность будет снижаться из-за переключения контекста и накладных расходов на создание потоков.
Schedulers.newThread
Создает новый поток для каждой запланированной задачи. Это планировщик стоит дорого, так как не происходит повторного переиспользования потока.
Schedulers.from(Executor)
Создает настраиваемый планировщик, поддерживаемый указанным executor. Если задача запланирована, когда все потоки заняты - будет поставлена в очередь.
Scheduler.from(Executors.newFixedThreadPool(n)) // ограничить количество одновременных потоков в пуле
Schedulers.single
Поддерживается одним потоком, выполняет задачи последовательно в указанном порядке.
Schedulers.trampoline
Выполняет задачи в порядке FIFO (first in first out) одним из рабочих потоков. Используется при реализации рекурсии, чтобы не увеличивался стек вызовов.
AndroidSchedulers.mainThread
Основной поток - место, где происходит взаимодействие с пользователем.
Disposable
Disposable
Интерфейс с методами dispose() и isDisposed(). Работае как оболочка над другими операторами.
CompositeDisposable
Объединяет несколько disposable, которые могут быть удалены одновременно.
Flowable
Flowable
Работает как Observable, но поддерживает backpressure по умолчанию.
Backpressure
Ситуация, когда observable выдает элементы быстрее, чем оператор или подписчик может использовать. Observable имеет бесконечный буфер, в который будут добавляться элементы до тех пор, пока не случится OutOfMemoryError. BackpressureStrategy – это enum, который задает стратегию обработки backpressure.
BackpressureStrategy.MISSING | стратегия не установлена |
BackpressureStrategy.ERROR | в случае backpressure бросает исключение MissingBackpressureException |
BackpressureStrategy.BUFFER | буфер будет расширяться каждый раз, когда ему потребуется больше памяти. Его первоначальный размер составляет 128 элементов |
BackpressureStrategy.DROP | если потребитель занят обработкой данных, эта стратегия буквально отбрасывает все значения, которые не могут быть обработаны |
BackpressureStrategy.LATEST | также отбрасывает неиспользуемые значения, но кэширует последнее. Его можно сравнить с БУФЕРОМ с постоянным размером в 1 элемент. Благодаря этому, когда потребитель запрашивает новое значение, он всегда получает самое последнее |
Flowable.create<Int>({ emitter: FlowableEmitter<Int> ->
repeat(1_000) { count -> emitter.onNext(count) }
}, BackpressureStrategy.BUFFER)
.subscribe { println(it) }
Subjects
Subject – это абстрактный класс в RxJava, одновременно расширяющий класс Observable
и реализующий интерфейс Observer
. Subject – это hot observable. Виды: Publish, Replay, Behavior, Async, Unicast.
PublishSubject | Подписчики получают новые данные с момента подписки. PublishSubject не кэширует и не рассылает прошлые элементы (студент слушает лекцию с того момента, как вошел в аудиторию). |
ReplaySubject | Подписчики получают все данные, отправленные до подписки и все новые данные с момента подписки (студент опоздал на лекцию, но получает ее содержимое с самого начала). Если ReplaySubject создается фабричным методом createWithSize(size: Int), то подписчики будут получать только заданное количество элементов, отправленных в прошлом. |
BehaviorSubject | Подписчики получат последний элемент данных, разосланный до подписки и все новые данные с момента подписки. |
AsyncSubject | Подписчики получат только самый последний элемент данных, который был отправлен перед вызовом onComplete (студент пришел на пару и получил только домашку). |
UnicastSubject | Работает также как ReplaySubject но может иметь только одного подписчика. Все последующие подписчики получают onError с IllegalStateException . |
RxJava. Вопросы на собесе
- Какие основные операторы существуют в RxJava?
- Что такое Observable и как оно работает?
- В чем разница между Observable и Single?
- Что такое Subject и чем оно отличается от Observable?
- Как объединить несколько Observable потоков в один?
- Как обрабатывать ошибки в RxJava?
- Что такое backpressure и как его обрабатывать в RxJava?
- Что такое Disposable и как управлять жизненным циклом Observable?
- Как сделать параллельные запросы в RxJava?
Использовать операторы
zip()
илиmerge()
.
- Почему возникает UndeliverableException?
Возникает, когда исключение не может быть доставлено подписчику, обычно из-за завершения подписки или отсутствия обработчика ошибок. Оно может возникнуть, если ошибка происходит после завершения подписки или если ошибка не обрабатывается в методе onError. Для предотвращения можно настроить глобальный обработчик ошибок с помощью
RxJavaPlugins.setErrorHandler
.
- Какой оператор используется для объединения данных из двух Observable на основе условия?
• join
• combineLatest
• merge
• zip
- Какой тип Subject хранит только последний элемент и передает его новым подписчикам?
• PublishSubject
• BehaviorSubject
• ReplaySubject
• AsyncSubject
- Какой оператор используется для объединения нескольких Observable, при этом дожидаясь завершения всех источников?
• zip
• combineLatest
• concat
• merge
- Какой из следующих классов позволяет управлять подписками в RxJava?
• Disposable
• Observer
• Publisher
• Subscriber
- Какой оператор используется для выполнения действия после окончания работы Observer?
• doAfterComplete
• doOnNext
• doOnComplete
• doOnTerminate
- Какой оператор используется для пропуска первых N элементов, излучаемых Observable?
• ignore
• filter
• take
• skip