RxJava
RxJava
Библиотека для асинхронного программирования на Java, основанная на концепциях функционального программирования и реактивного программирования. Основная цель RxJava — облегчить работу с асинхронными потоками данных и событиями.
• Cold observable. Создаются по запросу и выдают данные при подписке на них. Пример: retrofit.
• Hot observable. Всегда активны и выдают данные независимо от того подписаны на них или нет. Пример: обработка жестов view.
Threading Operators
subscribeOn
Определяет поток, на котором будет выполнено создание Observable
и запуск его операций. Влияет на операторы выше. Если указан только subscribeOn
- все операторы будут выполняться в указанном потоке.
Observable.just("item")
.subscribeOn(Schedulers.io()) // Выполнение Observable на IO-потоке
.observeOn(AndroidSchedulers.mainThread()) // Обработка результатов на основном потоке
.subscribe(
item -> System.out.println("Received: " + item),
Throwable::printStackTrace
);
observeOn
Определяет поток, на котором будет производиться обработка элементов и уведомление об их изменении. Влияет на операторы ниже. Если указан только observeOn
- все операторы будут выполняться в текущем потоке, а операторы, расположенные ниже observeOn
будут переключаться на поток, указанный в observeOn
.
Observable.just("item")
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation()) // Обработка результатов на вычислительном потоке
.map(item -> item.toUpperCase())
.observeOn(AndroidSchedulers.mainThread()) // Обработка окончательных результатов на основном потоке
.subscribe(
item -> System.out.println("Processed: " + item),
Throwable::printStackTrace
);
doOnNext
Действие с каждым элементом. Выполняет указанное действие для каждого элемента, который проходит через Observable
, перед тем как он будет передан дальше.
observable.doOnNext { item -> println("Item: $item") }
doOnComplete
Действие при завершении потока. Выполняет указанное действие, когда Observable
завершает эмитирование всех элементов. Не вызывается, если поток был прерван с ошибкой.
observable.doOnComplete { println("Stream completed") }
doOnError
Действие при ошибке. Выполняет указанное действие, когда возникает ошибка в процессе работы Observable
.
observable.doOnError { error -> println("Error occurred: $error") }
doOnSubscribe
Действие при подписке. Выполняет указанное действие, когда подписка на Observable
начинается.
observable.doOnSubscribe { disposable -> println("Subscribed: $disposable") }
doAfterTerminate
Действие после завершения работы Observable
. Выполняет указанное действие после завершения работы Observable
, независимо от того, завершился ли поток успешно или с ошибкой.
observable.doAfterTerminate { println("Stream terminated") }
doOnDispose
Действие при отписке. Выполняет указанное действие, когда Disposable
объект используется для отписки от Observable
.
observable.doOnDispose { println("Disposed") }
Transformation Operators
Функции, которые позволяют трансформировать, фильтровать и управлять потоками данных, поступающими из Observable
.
just
Создает Observable
, который излучает один элемент.
Observable.just("Hello, World!")
.subscribe(System.out::println);
fromIterable
Создает Observable
из коллекции.
List<String> list = Arrays.asList("One", "Two", "Three");
Observable.fromIterable(list)
.subscribe(System.out::println);
map
Применяет функцию к каждому элементу.
Observable.just(1, 2, 3)
.map(i -> i * 2)
.subscribe(System.out::println); // Output: 2, 4, 6
flatMap
Преобразует каждый элемент в Observable
и объединяет результаты. Порядок генерированных элементов не гарантируется.
Observable.just(1, 2, 3)
.flatMap(i -> Observable.just(i, i * 2))
.subscribe(System.out::println); // Output: 1, 2, 2, 4, 3, 6
concatMap
Преобразует каждый элемент в Observable
и объединяет результаты, сохраняя порядок.
Observable.just(1, 2, 3)
.concatMap(i -> Observable.just(i, i * 2))
.subscribe(System.out::println); // Output: 1, 2, 2, 4, 3, 6
switchMap
Переключается на новый Observable
каждый раз, когда приходит новый элемент, отменяя предыдущие Observable
.
Observable<String> searchRequests = Observable.just("query1", "query2", "query3");
searchRequests
.switchMap(query -> performSearch(query))
.subscribe(
result -> System.out.println("Result: " + result),
Throwable::printStackTrace,
() -> System.out.println("Completed")
);
Observable<String> performSearch(String query) {
return Observable.just("Results for " + query)
.delay(1, TimeUnit.SECONDS);
}
filter
Пропускает элементы, которые не удовлетворяют условию.
Observable.just(1, 2, 3, 4, 5)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println); // Output: 2, 4
skip
Пропускает первые N элементов в Observable
и затем передает оставшиеся элементы.
Observable<Integer> source = Observable.range(1, 5);
source.skip(2) // Пропускает первые 2 элемента
.subscribe(System.out::println); // Output: 3, 4, 5
skipLast
Этот оператор пропускает последние N элементов в Observable
и передает все остальные.
Observable<Integer> source = Observable.range(1, 5);
source.skipLast(2) // Пропускает последние 2 элемента
.subscribe(System.out::println); // Output: 1, 2, 3
take
Берет только первые count элементов.
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(System.out::println); // Output: 1, 2, 3
takeLast
Берет последние count элементов.
Observable.just(1, 2, 3, 4, 5)
.takeLast(3)
.subscribe(System.out::println); // Output: 3, 4, 5
merge
Объединяет несколько Observable
в один.
Observable.merge(
Observable.just(1, 2),
Observable.just(3, 4)
).subscribe(System.out::println); // Output: 1, 2, 3, 4
concat
Объединяет несколько Observable
последовательно, сохраняя порядок элементов.
Observable.concat(
Observable.just(1, 2),
Observable.just(3, 4)
).subscribe(System.out::println); // Output: 1, 2, 3, 4
zip
Объединяет элементы из нескольких Observable
в пары.
Observable.zip(
Observable.just(1, 2),
Observable.just("A", "B"),
(i, s) -> i + s
).subscribe(System.out::println); // Output: 1A, 2B
combineLatest
Комбинирует последние значения из нескольких Observable
при каждом обновлении любого из них.
Observable.combineLatest(
Observable.just(1, 2).startWith(0),
Observable.just("A", "B"),
(i, s) -> i + s
).subscribe(System.out::println); // Output: 0A, 1A, 1B, 2B
join
Объединяет элементы из двух Observable
на основе определенного условия.
Observable<String> source1 = Observable.just("A", "B", "C");
Observable<Integer> source2 = Observable.just(1, 2, 3);
source1.join(
source2,
item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
item -> Observable.timer(1, TimeUnit.SECONDS), // Соединение через Observable
(s, i) -> s + i // Комбинирование элементов
).subscribe(System.out::println);
onErrorResumeNext
Возвращает новый Observable
при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.onErrorResumeNext(Observable.just(10, 20))
.subscribe(System.out::println); // Output: 1, 10, 20
onErrorReturnItem
Возвращает заданный элемент при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.onErrorReturnItem(-1)
.subscribe(System.out::println); // Output: 1, -1
retry
Повторяет попытку при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.retry(1)
.subscribe(System.out::println); // Output: 1, 2, 3
retryWhen
Позволяет управлять повторными попытками при возникновении ошибки.
Observable.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error!");
return i;
})
.retryWhen(errors -> errors.take(1).flatMap(error -> Observable.just(4)))
.subscribe(System.out::println); // Output: 1, 2, 4, 3
delay
Задерживает эмиссию элементов.
Observable.just(1, 2, 3)
.delay(1, TimeUnit.SECONDS)
.subscribe(System.out::println); // Output: 1, 2, 3 (через 1 секунду)
debounce
Эмитирует элементы, которые не были заменены другим элементом в течение заданного времени.
Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(100);
emitter.onNext("B");
Thread.sleep(100);
emitter.onNext("C");
emitter.onComplete();
}).debounce(150, TimeUnit.MILLISECONDS)
.subscribe(System.out::println); // Output: C
scan
Применяет функцию накопления к каждому элементу и предыдущему результату.
Observable.just(1, 2, 3)
.scan(0, (total, next) -> total + next)
.subscribe(System.out::println); // Output: 0, 1, 3, 6
distinct
Удаляет дублирующиеся элементы.
Observable.just(1, 1, 2, 2, 3, 3)
.distinct()
.subscribe(System.out::println); // Output: 1, 2, 3
share
Преобразует холодный источник данных в горячий, позволяя нескольким подписчикам совместно использовать одну и ту же подписку на исходный поток данных.
val observable = Observable.just(1, 2, 3)
val sharedObservable = observable.share()
sharedObservable.subscribe { println("Subscriber 1: $it") }
sharedObservable.subscribe { println("Subscriber 2: $it") }
timer
Cоздает Observable
, который через указанное время эмитит один элемент и завершает выполнение.
Observable.timer(5, TimeUnit.SECONDS)
.subscribe { println("Completed after 5 seconds") }
timeout
Используется для указания максимального времени ожидания между элементами потока. Если за это время не поступает новый элемент, поток выбрасывает ошибку TimeoutException
.
Observable
.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS) // задержка
.timeout(1, TimeUnit.SECONDS) // если больше 1 секунды ожидания, выбросит ошибку
.subscribe(
{ item -> println("Received: $item") },
{ error -> println("Error: $error") }
)
Streams
Observable
Источник данных, который испускает события или элементы данных и позволяет наблюдателям (Observers) подписываться на них. Это основной компонент для работы с реактивным программированием.
Observable.just("Hello", "RxJava")
.map { it.uppercase() }
.filter { it.startsWith("H") }
.subscribe(
{ println("Received: $it") },
{ println("Error: ${it.message}") },
{ println("Completed!") }
)
Single
Тип Observable
, который испускает только одно значение или ошибку. Используется, когда нужно вернуть одно значение, например, результат сетевого запроса.
Single.just("Hello, RxJava")
.map { it.uppercase() }
.subscribe(
{ println("Success: $it") },
{ println("Error: ${it.message}") }
)
Completable
Тип Observable
, который не испускает данные, а лишь сигнализирует о завершении (onComplete
) или ошибке (onError
). Используется для выполнения задач без возвращаемого результата, например, операций записи или удаления.
Completable.fromAction {
println("Executing some action...")
}
.subscribe(
{ println("Action completed!") },
{ error -> println("Error: ${error.message}") }
)
Maybe
Источник, который может либо вернуть одно значение, либо завершиться без результата, либо сообщить об ошибке. Используется для операций, где результат может быть необязательным, например, получение данных из кэша.
Maybe.just("Hello, Maybe")
.filter { it.length > 5 }
.subscribe(
{ println("Success: $it") },
{ println("Error: ${it.message}") },
{ println("No data emitted") }
)
Schedulers
Набор планировщиков, которые управляют выполнением потоков для асинхронной обработки данных. Они определяют, где и как будет исполняться работа, например, в фоновом, основном или вычислительном потоке.
Schedulers.io
Предоставляет пул потоков для задач ввода-вывода, таких как сетевые запросы, чтение файлов или работу с базой данных. Пул автоматически масштабируется под нагрузку.
fun main() {
Observable.just("File 1", "File 2")
.subscribeOn(Schedulers.io())
.subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
}
Schedulers.computation
Предоставляет пул потоков, оптимизированный для вычислительных задач, таких как обработка данных или сложные расчёты. Количество потоков равно числу процессоров устройства.
fun main() {
Observable.range(1, 5)
.subscribeOn(Schedulers.computation())
.map { it * it } // Вычисление квадрата
.subscribe { println("Result: $it on ${Thread.currentThread().name}") }
}
Schedulers.single
Выполняет все задачи на одном общем потоке, предназначенном для последовательного выполнения. Это полезно для операций, где важна строгая последовательность выполнения.
fun main() {
Observable.just("Task 1", "Task 2")
.subscribeOn(Schedulers.single())
.subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
}
Schedulers.trampoline
Запускает задачи в текущем потоке последовательно, помещая их в очередь. Новая задача выполняется только после завершения предыдущей. Это полезно для тестирования или строгой последовательности задач в одном потоке.
Observable.just("Task 1", "Task 2")
.subscribeOn(Schedulers.trampoline())
.subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
Schedulers.newThread
Создаёт новый поток для каждой задачи. Используется, когда требуется изолированный поток выполнения, например, для независимых операций.
fun main() {
Observable.just("Task 1", "Task 2")
.subscribeOn(Schedulers.newThread())
.subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
}
Schedulers.from(Executor)
Позволяет создавать кастомный планировщик (Scheduler
) на основе переданного Executor
. Это полезно, когда нужно контролировать управление потоками, например, задавать пул потоков.
fun main() {
val executor = Executors.newFixedThreadPool(2)
val scheduler = Schedulers.from(executor)
Observable.just("Task 1", "Task 2")
.subscribeOn(scheduler)
.subscribe { println("Processing: $it on ${Thread.currentThread().name}") }
executor.shutdown()
}
AndroidSchedulers.mainThread
Возвращает Scheduler
для выполнения операций в главном (UI) потоке Android. Используется, когда нужно обновить UI после выполнения асинхронных задач.
Observable.just("Hello, RxJava")
.subscribeOn(Schedulers.io()) // Асинхронная задача выполняется в фоновом потоке
.observeOn(AndroidSchedulers.mainThread()) // Результат возвращается в UI-поток
.subscribe { value ->
textView.text = value // Обновляем UI
}
Disposable
Механизм управления ресурсами, позволяющий отменять подписки на потоки данных и освобождать ресурсы.
fun main() {
val observable = Observable.just("Hello", "World")
val disposable: Disposable = observable.subscribe(
{ item -> println(item) }, // onNext
{ error -> println("Error: ${error.message}") }, // onError
{ println("Completed") } // onComplete
)
// Отмена подписки
disposable.dispose()
// Проверка, была ли подписка отменена
println("Disposed: ${disposable.isDisposed}")
}
dispose
Отменяет подписку и освобождает ресурсы.
disposable.dispose()
isDisposed
Проверяет, была ли подписка отменена.
val isDisposed = disposable.isDisposed()
CompositeDisposable
Позволяет управлять несколькими объектами Disposable
одновременно. Он упрощает управление подписками, особенно когда у вас есть несколько потоков данных, на которые вы подписаны, и вы хотите отменить их все в одном месте.
fun main() {
// Создание CompositeDisposable для управления подписками
val compositeDisposable = CompositeDisposable()
// Создание Observable
val observable1 = Observable.just("Hello")
val observable2 = Observable.just("World")
// Подписка на первый Observable
val disposable1: Disposable = observable1.subscribe(
{ item -> println(item) },
{ error -> println("Error: ${error.message}") },
{ println("Completed first observable") }
)
// Подписка на второй Observable
val disposable2: Disposable = observable2.subscribe(
{ item -> println(item) },
{ error -> println("Error: ${error.message}") },
{ println("Completed second observable") }
)
// Добавление Disposables в CompositeDisposable
compositeDisposable.addAll(disposable1, disposable2)
// Отмена всех подписок
compositeDisposable.dispose()
// Проверка, были ли подписки отменены
println("All disposed: ${compositeDisposable.size() == 0}")
}
Flowable
Предназначенный для работы с большими потоками данных или с данными, которые поступают с высокой частотой. Flowable
решает проблему перегрузки потока с помощью механизма backpressure, позволяющего регулировать скорость отправки данных и обработки.
Flowable.range(1, 10)
.map { it * 2 }
.subscribe(
{ value -> println("Получено: $value") },
{ error -> println("Ошибка: $error") }
)
BackpressureStrategy.MISSING
Cтратегия по умолчанию, которая не применяет никакой политики к backpressure. Если источник данных генерирует события слишком быстро, возникает ошибка MissingBackpressureException
.
Flowable.create<Int>({ emitter ->
for (i in 1..1000) {
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.MISSING)
.observeOn(Schedulers.io())
.subscribe(
{ value -> println("Получено: $value") },
{ error -> println("Ошибка: $error") } // Вероятно, будет MissingBackpressureException
)
BackpressureStrategy.BUFFER
Эта стратегия сохраняет все данные в буфере. Подходит, если данных не слишком много. Однако, если данных слишком много, может привести к переполнению памяти.
Flowable.create<Int>({ emitter ->
for (i in 1..1000) {
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.subscribe(
{ value -> println("Получено: $value") },
{ error -> println("Ошибка: $error") }
)
BackpressureStrategy.DROP
Пропускает новые данные, если потребитель не успевает их обрабатывать. Полезно для обработки событий, где важно только последнее значение, а не весь поток.
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.subscribe(
{ value -> println("Получено: $value") },
{ error -> println("Ошибка: $error") }
)
BackpressureStrategy.LATEST
Сохраняет только последнее значение, отбрасывая остальные. Подходит, если важно всегда иметь доступ к самому последнему значению.
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.observeOn(Schedulers.io())
.subscribe(
{ value -> println("Получено: $value") },
{ error -> println("Ошибка: $error") }
)
BackpressureStrategy.ERROR
Выбрасывает ошибку MissingBackpressureException
, если данных слишком много. Используется, когда не нужно обрабатывать данные при перегрузке.
Flowable.create<Int>({ emitter ->
for (i in 1..1000) {
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.ERROR)
.observeOn(Schedulers.io())
.subscribe(
{ value -> println("Получено: $value") },
{ error -> println("Ошибка: $error") } // Будет MissingBackpressureException
)
Subjects
Subject – это абстрактный класс в RxJava, одновременно расширяющий класс Observable
и реализующий интерфейс Observer
. Subject – это hot observable. Виды: Publish, Replay, Behavior, Async, Unicast.
PublishSubject
Работает как и Observable
, и Observer
одновременно. Он начинает отправлять данные подписчикам только после их подписки, игнорируя все предыдущие события.
• Не сохраняет предыдущие данные. Новые подписчики получают только последующие события.
• Завершает поток с помощью onComplete()
.
• Может отправить ошибку через onError()
. Если onError()
был вызван, подписчики больше не будут получать данные.
val subject = PublishSubject.create<String>()
// Подписчик 1
subject.subscribe { value ->
println("Подписчик 1: $value")
}
subject.onNext("Первое событие") // Подписчик 1 получит "Первое событие"
// Подписчик 2 подписывается после первого события
subject.subscribe { value ->
println("Подписчик 2: $value")
}
subject.onNext("Второе событие")
// Оба подписчика получат "Второе событие"
ReplaySubject | Подписчики получают все данные, отправленные до подписки и все новые данные с момента подписки (студент опоздал на лекцию, но получает ее содержимое с самого начала). Если ReplaySubject создается фабричным методом createWithSize(size: Int), то подписчики будут получать только заданное количество элементов, отправленных в прошлом. |
BehaviorSubject | Подписчики получат последний элемент данных, разосланный до подписки и все новые данные с момента подписки. |
AsyncSubject | Подписчики получат только самый последний элемент данных, который был отправлен перед вызовом onComplete (студент пришел на пару и получил только домашку). |
UnicastSubject | Работает также как ReplaySubject но может иметь только одного подписчика. Все последующие подписчики получают onError с IllegalStateException . |
Вопросы на собесе (36)
Streams (3)
- Какие основные стримы существуют в RxJava?
Observable
Single
Completable
Maybe
- Что такое Observable и как он работает?
Observable
представляет поток данных, который эмитирует значения и позволяет подписаться на них для обработки. Подписка запускает поток данных и обрабатывает его изменения.
- В чем разница между Observable и Single?
Observable
может эмитировать несколько значений или событий, тогда какSingle
эмитирует только одно значение или ошибку.Single
предназначен для сценариев, где ожидается лишь одно результатное значение, например, при выполнении асинхронной операции.
- Какие основные стримы существуют в RxJava?
Subjects (2)
- Что такое Subject и чем оно отличается от Observable?
Subject
— это объект, который позволяет как подписываться на события, так и эмитировать их. В отличие отObservable
, который только эмитирует данные,Subject
может действовать как источник и получатель данных.
- Какой тип Subject хранит только последний элемент и передает его новым подписчикам?
•
PublishSubject
•
BehaviorSubject
•
ReplaySubject
•
AsyncSubject
- Что такое Subject и чем оно отличается от Observable?
Operators (15)
- Как объединить несколько Observable потоков в один?
Используйте операторы
merge
,concat
, илиzip
.
- Как обрабатывать ошибки в RxJava?
Через
onError
,retry
иonErrorResumeNext
.
- Как работает оператор zip в RxJava?
Оператор
zip
объединяет несколько источников, комбинируя их элементы поочередно, пока все источники не завершатся. Он использует функцию для объединения соответствующих элементов и передает результат в новыйObservable
.
- Как сделать параллельные запросы в RxJava?
Использовать операторы
zip()
илиmerge()
.
- Разница между операторами map, flatMap, concatMap и switchMap в RxJava?
•
map
преобразует каждое значение, эмитированноеObservable
, в другое значение, возвращая одиночный элемент на каждое входное значение.•
flatMap
преобразует каждое значение в новыйObservable
и объединяет результаты асинхронно, что может привести к перемешиванию последовательности элементов.•
concatMap
аналогиченflatMap
, но сохраняет порядок, обрабатывая каждыйObservable
поочерёдно, не смешивая результаты.•
switchMap
преобразует значение в новыйObservable
, отменяя предыдущийObservable
, если приходит новое значение, что полезно для обработки только последнего запроса.
- Есть 2 источника Observable, первый испускает элементы A, второй - элементы Б. В случае применения switchMap какой будет результат?
При использовании
switchMap
результатом будет только последовательность элементов из второгоObservable
, испускающего элементы Б. Это происходит потому, чтоswitchMap
отписывается от предыдущегоObservable
(первого, который испускает A), как только второйObservable
(Б) начинает испускать значения. Таким образом, элементы A будут проигнорированы, и результатом будут только элементы Б.
- Есть 2 источника Observable, первый испускает элементы A, второй - элементы Б. В случае применения concatMap какой будет результат?
При использовании
concatMap
результатом будет последовательность, содержащая сначала все элементы A из первогоObservable
, а затем все элементы Б из второгоObservable
, так какconcatMap
обрабатывает каждый источник поочерёдно, сохраняя порядок.
- Какой оператор используется для объединения данных из двух Observable на основе условия?
•
join
•
combineLatest
•
merge
•
zip
- Какой оператор используется для объединения нескольких Observable, при этом дожидаясь завершения всех источников?
•
zip
•
combineLatest
•
concat
•
merge
- Какой оператор используется для объединения нескольких источников данных, при этом элементы могут излучаться вперемешку?
•
concat
•
zip
•
combineLatest
•
merge
- Какой оператор используется для выполнения побочных эффектов при подписке на Observable?
•
doOnNext
•
doOnComplete
•
doOnSubscribe
•
doOnError
- Какой оператор используется для выполнения действия после окончания работы Observer?
•
doAfterComplete
•
doOnNext
•
doOnComplete
•
doOnTerminate
- Какой оператор используется для пропуска первых N элементов, излучаемых Observable?
•
ignore
•
filter
•
take
•
skip
- Какой из перечисленных источников данных RxJava либо успешно завершает свою работу без какого-либо возвращаемого значения, либо выбрасывает исключение?
•
Observable
•
Flowable
•
Maybe
•
Completable
- Какой оператор используется для обработки ошибок в RxJava?
•
onErrorResumeNext
•
onError
•
handleError
•
catchError
- Как объединить несколько Observable потоков в один?
Disposable (2)
- Что такое Disposable и как управлять жизненным циклом Observable?
Disposable
используется для управления жизненным цикломObservable
и отмены подписки, освобождая ресурсы. Для этого сохраняйтеDisposable
и вызывайтеdispose()
при необходимости.
- Какой из следующих классов позволяет управлять подписками в RxJava?
•
Disposable
•
Observer
•
Publisher
•
Subscriber
- Что такое Disposable и как управлять жизненным циклом Observable?
Другие (14)
- Что такое backpressure и как его обрабатывать в RxJava?
Backpressure возникает, когда поток данных поступает быстрее, чем может быть обработан. Обрабатывать его можно с помощью операторов, таких как
onBackpressureBuffer
,onBackpressureDrop
илиonBackpressureLatest
, которые управляют тем, как данные накапливаются или сбрасываются, когда система перегружена.
- Почему возникает UndeliverableException?
Возникает, когда исключение не может быть доставлено подписчику, обычно из-за завершения подписки или отсутствия обработчика ошибок. Оно может возникнуть, если ошибка происходит после завершения подписки или если ошибка не обрабатывается в методе onError. Для предотвращения можно настроить глобальный обработчик ошибок с помощью
RxJavaPlugins.setErrorHandler
.
- Что такое Flowable?
Flowable
— это один из типов источников данных, предназначенный для работы с асинхронными потоками данных. Он поддерживает обратный поток (backpressure), что позволяет контролировать скорость, с которой данные отправляются подписчикам, и предотвращает переполнение буфера при обработке больших объемов данных.
- Какие источники в RxJava являются холодными и горячими?
• Холодные источники (например,
Observable
,Flowable
,Single
,Maybe
) начинают испускать данные только при подписке на них. Каждая подписка получает свой собственный набор данных, что позволяет избежать дублирования.• Горячие источники (например,
Subject
,BehaviorSubject
,ReplaySubject
) испускают данные независимо от подписчиков. Подписчики могут пропустить данные, если они подписываются после того, как данные уже были отправлены.
- Как превратить холодный источник данных в горячий в RxJava?
• Использовать
Subject
.val subject = PublishSubject.create<DataType>() coldSource.subscribe(subject)
• Использовать оператор
share()
.val hotObservable = coldSource.share()
- На каком Shedulers будет выполнен оператор filter() в данной rx-цепочке?
Observable.just("1") .subscribe0n(Schedulers.io()) .map {} .observe0n(Schedulers.trampoline()) .filter {} .observe0n(AndroidSchedulers.mainThread()) .subscribe {}
На
Schedulers.trampoline()
, так какobserveOn(Schedulers.trampoline())
меняет поток для всех последующих операторов до следующего вызоваobserveOn()
.
- Что выведет println?
fun main() { val orderIds: List<Long> = listOf(1, 2, 3) loadAllOrders(orderIds) .subscribel { orders -> val result = orderIds = orders.map(Order::id) println(result) }) } private fun loadAllOrders(orderIds: List<Long>): Single<List<Order>>{ return Observable.fromIterable(orderIds) .subscribeOn(Schedulers.io)) .flatMapSingle(::loadOrderById) .toList() }
flatMap
не гарантирует порядок выдаваемых значений, поэтому результат может быть иtrue
иfalse
. Чтобы всегда былоtrue
,flatMap
нужно поменять наconcatMap
. Он будет работать дольше. На больших данных заметно дольше.Решение: после метода
toList
добавить сортировку.
- На каком потоке вызывается код в doOnSubscribe?
fun updateRegions() { getRegions() .subscribeOn(Schedulers.single()) .observen(Schedulers.computation()) .doOnSubscribe { showProgress() } .subscribeOn(Schedulers.io)) .subscribe { regions -> showRegions(regions) } }
Правильный ответ -
io
. Поток дляdoOnSubscribe
задается первыйsubscribeOn
после него.
- На каком потоке вызывается код в flatMap?
fun updateRegions() { getRegions() .subscribeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .flatMap { getCity() } .subscribeOn(Schedulers.io)) .subscribe( { regions -> showRegions (regions) } ) }
Код внутри
flatMap
вызывается на том потоке, который был установлен последним перед ним, то есть наSchedulers.computation
из вызоваobserveOn
.
- На каком потоке вызывается код в onSuccess?
fun updateRegions() { getRegions() .observen(Schedulers.computation()) .observeOn(Schedulers.io)) .subscribe( { regions -> showRegions(regions) } ) }
Метод
onSuccess
вsubscribe
вызывается на потоке, который был установлен последним вызовомobserveOn
. В данном случае, этоSchedulers.io
, поэтому код внутри лямбдыsubscribe
(включая вызовshowRegions(regions)
) выполнится на IO-потоке.
- На каком потоке вызывается код в onSuccess?
fun updateRegions() { getRegions() .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.io()) .subscribe( { regions -> showRegions (regions) } }
Правильный ответ -
computation
. Будет отрабатывать тотsubscribeOn
который находится ближе всего к источнику - остальныеsubscribeOn
будут игнорироваться. Не имеет значения в каком месте ветки указанsubscribeOn
.
- На каком потоке вызывается код в onSuccess?
fun updateRegions() { getRegions () .subscribeOn(Schedulers.io)) .subscribe( { regions -> showRegions(regions) } }
Поток, на котором будет вызван метод
onSuccess
, зависит от использования операторовsubscribeOn
иobserveOn
. ЗдесьsubscribeOn(Schedulers.io)
указывает, что работа с источником данныхgetRegions
будет выполняться в потоке из пулаSchedulers.io
. Однако, методsubscribe
по умолчанию работает в том же потоке, где происходит подписка, если не используетсяobserveOn
.
- На каком потоке вызывается код в onSuccess?
fun updateRegions() { getRegions() .subscribe( { regions -> showRegions (regions) } }
По умолчанию
onSuccess
вызывается на том потоке, на котором произошло завершение работы источника данных. Если вы не указали явноsubscribeOn
иobserveOn
, то выполнение будет происходить в том же потоке, где выполняетсяgetRegions
.
- На каком потоке вызывается код в onSuccess?
fun updateRegions() { getRegions() .subscribeOn(Schedulers.io)) .observeOn(Schedulers.computation()) .subscribe( { regions -> showRegions(regions) } ) }
Правильный ответ - на
computation
. Поток запустится наio
.subscribeOn
отвечает за методы выше.observeOn
отвечает за методы ниже. Результат выведется на ближайшемobserveOn
.
- Что такое backpressure и как его обрабатывать в RxJava?