Flow
https://developer.android.com/kotlin/flow |
https://kotlinlang.org/docs/flow.html |
Типы функций над потоками?
В зависимости от того, возвращают они конкретное значение или обработанный поток функции делятся на: терминальные и промежуточные. Терминальные функции потоков представляют suspend-функции, которые позволяют получать объекты из потока или возвращают какое-то конечное значение: collect, toList, toSet first, firstOrNull, last, lastOrNull, single, singleOrNull, count, reduce, fold. Промежуточные функции не являются suspend, принимают поток и возвращают обработанный поток: combine, drop, filter, filterNot, filterNotNull, map, onEach, take, transform, zip.
Backpressure
У Flow backpressure заложена в Kotlin suspending functions. Если сборщик flow не может принимать новые данные в настоящий момент, он приостанавливает источник. Возобновление происходит позднее, когда сборщик flow снова сможет получать данные. Таким образом, в Kotlin нет необходимости выбирать тип источника данных, в отличие от RxJava.
Flow
Cold flow. Асинхронный поток данных который последовательно эмитит данные и завершается успешно или с исключением. Похож на Observable
в RxJava.
emit
Метод передачи события, suspend function.
tryEmit
Попытается передать событие без приостановки.
callbackFlow
val locationsSource: Flow<Location> = callbackFlow<Location> {
val callback = object : LocationCallback() {
override fun onLocationResult(result: LocationResult?) {
result ?: return
offer(result.lastLocation)
}
}
requestLocationUpdates(createLocationRequest(), callback, Looper.getMainLooper())
.addOnFailureListener { e ->
close(e) // in case of exception, close the Flow
}
awaitClose { // clean up when Flow collection ends
removeLocationUpdates(callback)
}
}
flowOf
Cоздает поток из набора переданных в функцию значений.
val numberFlow: Flow<Int> = flowOf(1, 2, 3, 5, 8)
asFlow
Стандартные коллекции и последовательности в Kotlin имеют метод расширения asFlow(), который позволяет преобразовать коллекцию или последовательность в поток.
val numberFlow: Flow<Int> = (1..5).asFlow()
val nameFlow: Flow<String> = listOf("Tom", "Sam", "Bob").asFlow()
flowOn
Изменить CoroutineContext
потока. flowOn
изменяет CoroutineContext
восходящего потока, то есть производитель и любые промежуточные операторы, применяемые до (или выше) flowOn
. Если имеется несколько операторов flowOn
, каждый из них изменяет восходящий поток относительно своего текущего местоположения.
flowOf("A", "B", "C")
.catch { Timber.e(it) }
.flowOn(Dispatchers.IO)
.collect {}
collect
Используется для сбора данных из потока и выполнения действий над каждым элементом этого потока.
• Блокирует текущий поток до тех пор, пока все данные не будут собраны или поток не завершится.
• Если нужно обработать ошибки при сборе данных, используйте try-catch внутри метода collect или применяйте операторы для обработки ошибок.
fun main() = runBlocking {
// Пример создания простого потока
val flow = flow {
for (i in 1..5) {
delay(1000) // Имитируем задержку
emit(i) // Излучаем элемент
}
}
// Сбор данных из потока
flow.collect { value ->
println("Received: $value")
}
}
collectLatest
Оператор терминального потока, который собирает данный поток с заданным действием. Принципиальное отличие от collect заключается в том, что когда исходный поток выдает новое значение, блок действий для предыдущего значения отменяется.
flow {
emit(1)
delay(50)
emit(2)
}.collectLatest { value ->
println("collect $value")
delay(100)
println("$value collected")
} // collect 1, collect 2, 2 collected
catch
Оператор обработки исключений.
newsRepository.favoriteLatestNews
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
toList
Преобразует поток значений в коллекцию List.
val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val list: MutableList<Int> = mutableListOf()
numberFlow.toList()
numberFlow.toList(list)
toSet
Преобразует поток значений в коллекцию Set.
val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val set: LinkedHashSet<Int> = linkedSetOf()
numberFlow.toSet()
numberFlow.toSet(set)
toCollection
Collects given flow into a collection.
val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val collection: MutableList<Int> = mutableListOf()
numberFlow.toCollection(collection)
first
Получает первый объект из потока.
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.first() // Tom
nameFlow.first { name -> name.length > 3 } // Kate
firstOrNull
Получает первый объект из потока или null, если список пустой или условию не соответствует ни один из элементов потока.
val nameFlow: Flow<String> = listOf<String>().asFlow()
nameFlow.firstOrNull()
nameFlow.firstOrNull { name -> name.length > 3 }
last
Получает последний объект из потока.
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.last() // Alice
lastOrNull
Получает последний объект из потока или null.
val nameFlow: Flow<String> = listOf<String>().asFlow()
nameFlow.lastOrNull() // null
single
Возвращает единственный элемент потока, если поток содержит только один элемент. Если поток не содержит элементов генерируется исключение NoSuchElementException, а если в потоке больше одного элемента - исключение IllegalStateException.
val nameFlow: Flow<String> = listOf("Tom").asFlow()
nameFlow.single() // Tom
singleOrNull
Ожидает получение одного объекта из потока или null, если поток пуст или если в потоке больше одного элемента.
val nameFlow: Flow<String> = listOf("Tom", "Bob").asFlow()
nameFlow.singleOrNull() // null
count
Получить количество элементов в потоке.
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.count() // 5
nameFlow.count { name -> name.length > 3 } // 2
reduce
Получает результат определенной операции над элементами потока. Сводит все значения потока к одному значению. Первый параметр при первом запуске представляет первый объект потока, а при последующих запусках - результат функции над предыдущими объектами. А второй параметр функции - следующий объект.
val numberFlow: Flow<Int> = listOf(1, 2, 3).asFlow()
numberFlow.reduce { accumulator, value -> accumulator + value } // 6
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.reduce { accumulator, value -> "$accumulator $value" } // Tom Bob Kate Sam Alice
fold
Получает результат определенной операции над элементами потока, в отличие от функции reduce() принимает начальное значение. Сводит все элементы потока в один.
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.fold("Names:") { accumulator, value -> "$accumulator $value" } // Names: Tom Bob Kate Sam Alice
combine
Объединяет два потока в один, после применения к их элементам функции преобразования. Последние элементы потоков всегда объединяются.
flowOf("A", "B")
.combine(flowOf("1", "2", "3")) { a, b ->
a + b
} // A1, B1, B2, B3
combineTransform
Возвращает поток, значения которого генерируются функцией преобразования, обрабатывающей последние переданные значения каждым потоком.
combineTransform(
flowOf("A", "B", "C"),
flowOf("1", "2", "3")
) { letter, number ->
val result = "$letter$number"
emit(result)
} // A1, B1, B2, C2, C3
drop
Удаляет из потока определенное количество элементов. В качестве параметра принимает количество элементов с начала потока, которые надо убрать.
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.drop(3) // Sam, Alice
dropWhile
Удаляет из потока элементы, пока они не начнут соответствовать некоторому условию.
val peopleFlow: Flow<Person> = listOf(
Person("Tom", 37),
Person("Alice", 32),
Person("Bill", 5),
Person("Sam", 14),
Person("Bob", 25)
).asFlow()
peopleFlow.dropWhile { person -> person.age > 17 } // Bill, Sam, Bob
filter
Фильтрует объекты в потоке.
val peopleFlow: Flow<Person> = listOf(
Person("Tom", 37),
Person("Bill", 5),
Person("Sam", 14),
Person("Bob", 21)
).asFlow()
peopleFlow.filter { person -> person.age > 17 } // Tom, Bob
filterNot
Фильтрует поток, оставляя те элементы, которые не соответствуют условию.
val peopleFlow: Flow<Person> = listOf(
Person("Tom", 37),
Person("Bill", 5),
Person("Sam", 14),
Person("Bob", 21)
).asFlow()
peopleFlow.filterNot { person -> person.age > 17 } // Bill, Sam
filterNotNull
Фильтрует поток, удаляя все элементы, которые равны null.
val peopleFlow: Flow<Person?> = listOf(
Person("Tom", 37),
null
Person("Sam", 14),
null
).asFlow()
peopleFlow.filterNotNull() // Tom, Sam
filterIsInstance
val anyFlow: Flow<Any> = listOf(
"Alice",
1,
"Bob",
2
).asFlow()
anyFlow.filterIsInstance<Int>() // 1, 2
map
Преобразует данные потока. В качестве параметра он принимает функцию преобразования. Функция преобразования принимает в качестве единственного параметра объект из потока и возвращает преобразованные данные.
val peopleFlow: Flow<Person> = listOf(
Person("Tom", 37),
Person("Sam", 41),
Person("Bob", 21),
Person("Bill", 5)
).asFlow()
peopleFlow.map { person -> person.name } // Tom, Sam, Bob, Bill
peopleFlow
.map { person -> object {
val name: String = person.name
val isAdult: Boolean = person.age > 17
} }
mapNotNull
Возвращает поток, содержащий только ненулевые результаты применения заданной функции преобразования к каждому значению исходного потока.
val peopleFlow: Flow<Person?> = listOf(
Person("Tom", 37),
null,
Person("Sam", 41),
null,
Person("Bob", 21),
null,
Person("Bill", 5)
).asFlow()
peopleFlow.mapNotNull { person -> person?.name } // Tom, Sam, Bob, Bill
onEach
Применяет к элементам потока определенную функцию перед тем, как они будут переданы в возвращаемый поток.
val numberFlow: Flow<Int> = flowOf(1, 2, 3)
numberFlow.onEach { delay(1000L) }
take
Ограничивает количество элементов в потоке. В качестве параметра принимает количество элементов с начала потока, которые надо оставить.
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.take(3) // Tom, Bob, Kate
takeWhile
Выбирает из потока элементы, пока будет истино некоторое условие.
val peopleFlow: Flow<Person> = listOf(
Person("Tom", 37),
Person("Alice", 32),
Person("Bill", 5),
Person("Sam", 14),
Person("Bob", 25)
).asFlow()
peopleFlow.takeWhile { person -> person.age > 17 } // Tom, Alice
transform
Выполняем преобразование объектов в потоке. В отличие от map позволяет использовать функцию emit(), чтобы передавать в поток произвольные объекты.
val peopleFlow: Flow<Person> = listOf(
Person("Tom", 37),
Person("Bill", 5),
Person("Sam", 14),
Person("Bob", 21)
).asFlow()
peopleFlow
.transform { person ->
if (person.age > 17) {
emit(person.name)
}
} // Tom, Bob
val numberFlow: Flow<Int> = listOf(2, 3, 4).asFlow()
numberFlow
.transform { number ->
emit(number)
emit(number * number)
} // 2, 4, 3, 9, 4, 16
transformLatest
Возвращает поток, который создает элемент с помощью функции преобразования каждый раз, когда исходный поток выдает значение. Когда исходный поток выдает новое значение, предыдущий блок преобразования отменяется.
flow {
emit("A")
delay(100)
emit("B")
}.transformLatest { value ->
emit(value)
delay(200)
emit(value + "_last")
} // A, B, B_last
transformWhile
Применяет функцию преобразования к каждому значению данного потока, пока эта функция возвращает значение true.
zip
Позволяет объединить 2 потока данных. Оператор zip принимает два параметра. Первый параметр - поток данных, с которым надо выполнить объединение. Второй параметр - собственно функция объединения. Она принимает соответствующие элементы обоих потоков в качестве параметров и возвращает результат их объединения.
val nameFlow: Flow<String> = listOf("Tom", "Bob", "Sam").asFlow()
val ageFlow: Flow<Int> = listOf(37, 41, 25).asFlow()
nameFlow
.zip(ageFlow) { name, age -> Person(name, age) }
.collect { person -> println("Name: ${person.name}, Age: ${person.age}") }
debounce
Не позволяет выполнить операцию, пока не истечет установленный таймер. Удобно для полей ввода текста. После получения события запускается таймер. Если новое событие приходит, когда таймер активен - таймер перезапускается. Если таймер истек, мы испускаем последний испущенный элемент.
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.debounce(1000) // 3, 4, 5
sample
Подобно debounce используется для фильтрации элементов потока, но имеет отличие - вместо проверка интервала от последнего элемента запускается периодически и отправляет последний элемент за интервал. Например, поток выдает элементы каждые 50 мс, и если применить выборку в 100 мс, получим каждый второй элемент. С оператором debounce получили бы только элемент 9.
flow {
repeat(10) {
emit(it)
delay(50)
}
}.sample(100) // 1, 3, 5, 7, 9
flatMapMerge
Сопоставляет каждый элемент потока с новым потоком, представленным операцией преобразования, а затем объединяет элементы этих потоков и сглаживает его.
flowOf(1, 2, 3)
.flatMapMerge { number: Int ->
flowOf("$number A", "$number B")
} // 1A, 1B, 2A, 2B, 3A, 3B
flatMapConcat
Подобен flatMapMerge, но здесь потоки объединяются, а не сливаются. Важно: flatmapMerge имеет лучшую производительность, поскольку может обрабатывать потоки параллельно.
flowOf(1, 2, 3)
.flatMapConcat { number: Int ->
flowOf("$number A", "$number B")
} // 1A, 1B, 2A, 2B, 3A, 3B
flatMapLatest
Возвращает поток, который переключается на новый поток, создаваемый функцией преобразования, каждый раз, когда исходный поток выдает значение. Когда исходный поток выдает новое значение, предыдущий поток, созданный блоком преобразования, отменяется.
flow {
emit("A")
delay(100)
emit("B")
}.flatMapLatest { value ->
flow {
emit(value)
delay(200)
emit(value + "_last")
}
} // A, B, B_last
buffer
Не меняет поток элементов, а помогает с производительностью. Если мы добавим буфер между операторами onEach и collect, он создаст отдельную сопрограмму для параллельного сбора элементов из оператора onEach в соответствии с предоставленной емкостью буфера. Это позволит нам собрать все элементы за 300 мс, так как все операторские операции не должны выполняться последовательно.
flowOf("A", "B", "C")
.onEach { delay(300) } // Output takes 900ms as all items are processed sequentially
flowOf("A", "B", "C")
.onEach { delay(300) }
.buffer(3) // Output takes 300ms as all items are processed in parallel at buffer operator
// and delivered to collect immediately.
onEmpty
Вызывает действие, если поток завершается без испускания элементов.
val nameFlow: Flow<String> = flowOf()
nameFlow.onEmpty {
emit("Bob")
emit("Alice")
} // Bob, Alice
conflate
В случае нескольких вызовов emit равно обработает все, но соберет только последний вызов.
StateFlow
StateFlow
Hot flow. Это специализация SharedFlow
. Доставляет последнее событие только новым подписчикам
• есть возможность синхронно получить value.
• сравнивает предыдущее значение с новым. игнорирует если одинаковые.
MutableStateFlow
Мутабельный StateFlow
.
asStateFlow
Конвертирует мутабельный MutableStateFlow
в иммутабельный StateFlow
(только для чтения).
stateIn
Преобразует холодный поток в горячий. Возвращает оператор StateFlow
. Полезен в ситуациях, когда имеется холодный поток, предоставляющий обновления значения некоторого состояния, создание и/или обслуживание которого требует больших затрат, но имеется несколько подписчиков, которым необходимо собирать самое последнее значение состояния.
val stateFlow: StateFlow<Int> = flow
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = 0
)
update
Метод для обновления данных в MutableStateFlow
.
data class Person(
val name: String
)
private val personFlow = MutableStateFlow(Person("John"))
personFlow.update {
it.copy(name = "Vlad")
}
collectAsState
Используется в Compose. Предоставляет значение этого StateFlow
через State
. Начальное значение - StateFlow.value
. Каждый раз когда в StateFlow
будет отправляться новое значение будет вызываться рекомпозиция.
private val _numberFlow: MutableStateFlow<Int> = MutableStateFlow(42)
val numberFlow: StateFlow<Int> = _numberFlow.asStateFlow()
val number: Int by viewModel.numberFlow.collectAsState()
collectAsStateWithLifecycle
Предоставляет значение этого StateFlow
через State
учитывая жизненный цикл. Каждый раз когда в StateFlow
будет отправляться новое значение будет вызываться рекомпозиция если жизненный цикл lifecycleOwner
равен хотя бы minActiveState
(Lifecycle.State.STARTED
). Сбор данных останавливается если жизненный цикл падает ниже минимального и перезапускается если наоборот.
private val _numberFlow: MutableStateFlow<Int> = MutableStateFlow(42)
val numberFlow: StateFlow<Int> = _numberFlow.asStateFlow()
val number: Int by viewModel.numberFlow.collectAsStateWithLifecycle()
SharedFlow
SharedFlow
Hot flow. Генерирует события, даже если вы не вызываете collect
на нем
• нет возможность синхронно получить value.
• всегда отправляет новые значения, даже если они одинаковые.
• может иметь несколько подписчиков.
MutableSharedFlow
Мутабельный SharedFlow
.
asSharedFlow
Конвертирует MutableSharedFlow
в иммутабельный SharedFlow
(только для чтения).
shareIn
Преобразует холодный поток в горячий. Возвращает оператор SharedFlow
.
val sharedFlow: SharedFlow<Int> = flow
.shareIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
replay = 0
)
Kotlin Coroutines Flow. Вопросы на собесе
- Как можно создавать Flow и какие есть билдеры?
- Как обрабатывать ошибки в Flow?
- Как работает оператор flowOn?
- Как работает оператор collect?
- Разница между StateFlow и SharedFlow?
•
StateFlow
хранит текущее значение и всегда возвращает последнее значение новым подписчикам, обеспечивая сохранение состояния.•
SharedFlow
не сохраняет состояние, а просто транслирует эмитированные значения всем подписчикам, без сохранения их в истории.
- Разница между холодным и горячим потоками в Kotlin?
Холодный поток в Kotlin создается с помощью
Flow
, который начинает эмитировать данные только при подписке. Горячий поток реализуется с помощьюSharedFlow
илиStateFlow
, который продолжает эмитировать данные независимо от подписчиков.