Flow

https://developer.android.com/kotlin/flow
https://kotlinlang.org/docs/flow.html

Типы функций над потоками?

В зависимости от того, возвращают они конкретное значение или обработанный поток функции делятся на: терминальные и промежуточные. Терминальные функции потоков представляют suspend-функции, которые позволяют получать объекты из потока или возвращают какое-то конечное значение: collect, toList, toSet first, firstOrNull, last, lastOrNull, single, singleOrNull, count, reduce, fold. Промежуточные функции не являются suspend, принимают поток и возвращают обработанный поток: combine, drop, filter, filterNot, filterNotNull, map, onEach, take, transform, zip.

Backpressure

У Flow backpressure заложена в Kotlin suspending functions. Если сборщик flow не может принимать новые данные в настоящий момент, он приостанавливает источник. Возобновление происходит позднее, когда сборщик flow снова сможет получать данные. Таким образом, в Kotlin нет необходимости выбирать тип источника данных, в отличие от RxJava.

Flow

Cold flow. Асинхронный поток данных который последовательно эмитит данные и завершается успешно или с исключением. Похож на Observable в RxJava.

emit

Метод передачи события, suspend function.

tryEmit

Попытается передать событие без приостановки.

callbackFlow

val locationsSource: Flow<Location> = callbackFlow<Location> {
    val callback = object : LocationCallback() {
        override fun onLocationResult(result: LocationResult?) {
            result ?: return
            offer(result.lastLocation)
        }
    }
    requestLocationUpdates(createLocationRequest(), callback, Looper.getMainLooper())
        .addOnFailureListener { e ->
            close(e) // in case of exception, close the Flow
        }
    awaitClose { // clean up when Flow collection ends
        removeLocationUpdates(callback)
    }
}

flowOf

Cоздает поток из набора переданных в функцию значений.

val numberFlow: Flow<Int> = flowOf(1, 2, 3, 5, 8)

asFlow

Стандартные коллекции и последовательности в Kotlin имеют метод расширения asFlow(), который позволяет преобразовать коллекцию или последовательность в поток.

val numberFlow: Flow<Int> = (1..5).asFlow()
val nameFlow: Flow<String> = listOf("Tom", "Sam", "Bob").asFlow()

flowOn

Изменить CoroutineContext потока. flowOn изменяет CoroutineContext восходящего потока, то есть производитель и любые промежуточные операторы, применяемые до (или выше) flowOn. Если имеется несколько операторов flowOn, каждый из них изменяет восходящий поток относительно своего текущего местоположения.

flowOf("A", "B", "C")
    .catch { Timber.e(it) }
    .flowOn(Dispatchers.IO)
    .collect {}

collect

Используется для сбора данных из потока и выполнения действий над каждым элементом этого потока.

• Блокирует текущий поток до тех пор, пока все данные не будут собраны или поток не завершится.

• Если нужно обработать ошибки при сборе данных, используйте try-catch внутри метода collect или применяйте операторы для обработки ошибок.

fun main() = runBlocking {
    // Пример создания простого потока
    val flow = flow {
        for (i in 1..5) {
            delay(1000) // Имитируем задержку
            emit(i) // Излучаем элемент
        }
    }

    // Сбор данных из потока
    flow.collect { value ->
        println("Received: $value")
    }
}

collectLatest

Оператор терминального потока, который собирает данный поток с заданным действием. Принципиальное отличие от collect заключается в том, что когда исходный поток выдает новое значение, блок действий для предыдущего значения отменяется.

flow {
    emit(1)
    delay(50)
    emit(2)
}.collectLatest { value ->
    println("collect $value")
    delay(100)
    println("$value collected")
} // collect 1, collect 2, 2 collected

catch

Оператор обработки исключений.

newsRepository.favoriteLatestNews
    .catch { exception -> notifyError(exception) }
    .collect { favoriteNews ->
        // Update View with the latest favorite news
    }

toList

Преобразует поток значений в коллекцию List.

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val list: MutableList<Int> = mutableListOf()
numberFlow.toList()
numberFlow.toList(list)

toSet

Преобразует поток значений в коллекцию Set.

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val set: LinkedHashSet<Int> = linkedSetOf()
numberFlow.toSet()
numberFlow.toSet(set)

toCollection

Collects given flow into a collection.

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val collection: MutableList<Int> = mutableListOf()
numberFlow.toCollection(collection)

first

Получает первый объект из потока.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.first() // Tom
nameFlow.first { name -> name.length > 3 } // Kate

firstOrNull

Получает первый объект из потока или null, если список пустой или условию не соответствует ни один из элементов потока.

val nameFlow: Flow<String> = listOf<String>().asFlow()
nameFlow.firstOrNull()
nameFlow.firstOrNull { name -> name.length > 3 }

last

Получает последний объект из потока.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.last() // Alice

lastOrNull

Получает последний объект из потока или null.

val nameFlow: Flow<String> = listOf<String>().asFlow()
nameFlow.lastOrNull() // null

single

Возвращает единственный элемент потока, если поток содержит только один элемент. Если поток не содержит элементов генерируется исключение NoSuchElementException, а если в потоке больше одного элемента - исключение IllegalStateException.

val nameFlow: Flow<String> = listOf("Tom").asFlow()
nameFlow.single() // Tom

singleOrNull

Ожидает получение одного объекта из потока или null, если поток пуст или если в потоке больше одного элемента.

val nameFlow: Flow<String> = listOf("Tom", "Bob").asFlow()
nameFlow.singleOrNull() // null

count

Получить количество элементов в потоке.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.count() // 5
nameFlow.count { name -> name.length > 3 } // 2

reduce

Получает результат определенной операции над элементами потока. Сводит все значения потока к одному значению. Первый параметр при первом запуске представляет первый объект потока, а при последующих запусках - результат функции над предыдущими объектами. А второй параметр функции - следующий объект.

val numberFlow: Flow<Int> = listOf(1, 2, 3).asFlow()
numberFlow.reduce { accumulator, value -> accumulator + value } // 6

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.reduce { accumulator, value -> "$accumulator $value" } // Tom Bob Kate Sam Alice

fold

Получает результат определенной операции над элементами потока, в отличие от функции reduce() принимает начальное значение. Сводит все элементы потока в один.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.fold("Names:") { accumulator, value -> "$accumulator $value" } // Names: Tom Bob Kate Sam Alice

combine

Объединяет два потока в один, после применения к их элементам функции преобразования. Последние элементы потоков всегда объединяются.

flowOf("A", "B")
    .combine(flowOf("1", "2", "3")) { a, b ->
        a + b
    } // A1, B1, B2, B3

combineTransform

Возвращает поток, значения которого генерируются функцией преобразования, обрабатывающей последние переданные значения каждым потоком.

combineTransform(
    flowOf("A", "B", "C"),
    flowOf("1", "2", "3")
) { letter, number ->
    val result = "$letter$number"
    emit(result)
} // A1, B1, B2, C2, C3

drop

Удаляет из потока определенное количество элементов. В качестве параметра принимает количество элементов с начала потока, которые надо убрать.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.drop(3) // Sam, Alice

dropWhile

Удаляет из потока элементы, пока они не начнут соответствовать некоторому условию.

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Alice", 32),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 25)
).asFlow()

peopleFlow.dropWhile { person -> person.age > 17 } // Bill, Sam, Bob

filter

Фильтрует объекты в потоке.

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 21)
).asFlow()

peopleFlow.filter { person -> person.age > 17 } // Tom, Bob

filterNot

Фильтрует поток, оставляя те элементы, которые не соответствуют условию.

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 21)
).asFlow()

peopleFlow.filterNot { person -> person.age > 17 } // Bill, Sam

filterNotNull

Фильтрует поток, удаляя все элементы, которые равны null.

val peopleFlow: Flow<Person?> = listOf(
    Person("Tom", 37),
    null
    Person("Sam", 14),
    null
).asFlow()

peopleFlow.filterNotNull() // Tom, Sam

filterIsInstance

val anyFlow: Flow<Any> = listOf(
    "Alice",
    1,
    "Bob",
    2
).asFlow()

anyFlow.filterIsInstance<Int>() // 1, 2

map

Преобразует данные потока. В качестве параметра он принимает функцию преобразования. Функция преобразования принимает в качестве единственного параметра объект из потока и возвращает преобразованные данные.

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Sam", 41),
    Person("Bob", 21),
    Person("Bill", 5)
).asFlow()

peopleFlow.map { person -> person.name } // Tom, Sam, Bob, Bill

peopleFlow
    .map { person -> object {
        val name: String = person.name
        val isAdult: Boolean = person.age > 17
    } }

mapNotNull

Возвращает поток, содержащий только ненулевые результаты применения заданной функции преобразования к каждому значению исходного потока.

val peopleFlow: Flow<Person?> = listOf(
    Person("Tom", 37),
    null,
    Person("Sam", 41),
    null,
    Person("Bob", 21),
    null,
    Person("Bill", 5)
).asFlow()

peopleFlow.mapNotNull { person -> person?.name } // Tom, Sam, Bob, Bill

onEach

Применяет к элементам потока определенную функцию перед тем, как они будут переданы в возвращаемый поток.

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
numberFlow.onEach { delay(1000L) }

take

Ограничивает количество элементов в потоке. В качестве параметра принимает количество элементов с начала потока, которые надо оставить.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.take(3) // Tom, Bob, Kate

takeWhile

Выбирает из потока элементы, пока будет истино некоторое условие.

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Alice", 32),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 25)
).asFlow()

peopleFlow.takeWhile { person -> person.age > 17 } // Tom, Alice

transform

Выполняем преобразование объектов в потоке. В отличие от map позволяет использовать функцию emit(), чтобы передавать в поток произвольные объекты.

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 21)
).asFlow()

peopleFlow
    .transform { person ->
        if (person.age > 17) {
            emit(person.name)
        }
    } // Tom, Bob

val numberFlow: Flow<Int> = listOf(2, 3, 4).asFlow()
numberFlow
    .transform { number ->
        emit(number)
        emit(number * number)
    } // 2, 4, 3, 9, 4, 16

transformLatest

Возвращает поток, который создает элемент с помощью функции преобразования каждый раз, когда исходный поток выдает значение. Когда исходный поток выдает новое значение, предыдущий блок преобразования отменяется.

flow {
    emit("A")
    delay(100)
    emit("B")
}.transformLatest { value ->
    emit(value)
    delay(200)
    emit(value + "_last")
} // A, B, B_last

transformWhile

Применяет функцию преобразования к каждому значению данного потока, пока эта функция возвращает значение true.

zip

Позволяет объединить 2 потока данных. Оператор zip принимает два параметра. Первый параметр - поток данных, с которым надо выполнить объединение. Второй параметр - собственно функция объединения. Она принимает соответствующие элементы обоих потоков в качестве параметров и возвращает результат их объединения.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Sam").asFlow()
val ageFlow: Flow<Int> = listOf(37, 41, 25).asFlow()
nameFlow
    .zip(ageFlow) { name, age -> Person(name, age) }
    .collect { person -> println("Name: ${person.name}, Age: ${person.age}") }

debounce

Не позволяет выполнить операцию, пока не истечет установленный таймер. Удобно для полей ввода текста. После получения события запускается таймер. Если новое событие приходит, когда таймер активен - таймер перезапускается. Если таймер истек, мы испускаем последний испущенный элемент.

flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000) // 3, 4, 5

sample

Подобно debounce используется для фильтрации элементов потока, но имеет отличие - вместо проверка интервала от последнего элемента запускается периодически и отправляет последний элемент за интервал. Например, поток выдает элементы каждые 50 мс, и если применить выборку в 100 мс, получим каждый второй элемент. С оператором debounce получили бы только элемент 9.

flow {
    repeat(10) {
        emit(it)
        delay(50)
    }
}.sample(100) // 1, 3, 5, 7, 9

flatMapMerge

Сопоставляет каждый элемент потока с новым потоком, представленным операцией преобразования, а затем объединяет элементы этих потоков и сглаживает его.

flowOf(1, 2, 3)
    .flatMapMerge { number: Int ->
        flowOf("$number A", "$number B")
    } // 1A, 1B, 2A, 2B, 3A, 3B

flatMapConcat

Подобен flatMapMerge, но здесь потоки объединяются, а не сливаются. Важно: flatmapMerge имеет лучшую производительность, поскольку может обрабатывать потоки параллельно.

flowOf(1, 2, 3)
    .flatMapConcat { number: Int ->
        flowOf("$number A", "$number B")
    } // 1A, 1B, 2A, 2B, 3A, 3B

flatMapLatest

Возвращает поток, который переключается на новый поток, создаваемый функцией преобразования, каждый раз, когда исходный поток выдает значение. Когда исходный поток выдает новое значение, предыдущий поток, созданный блоком преобразования, отменяется.

flow {
    emit("A")
    delay(100)
    emit("B")
}.flatMapLatest { value ->
    flow {
        emit(value)
        delay(200)
        emit(value + "_last")
    }
} // A, B, B_last

buffer

Не меняет поток элементов, а помогает с производительностью. Если мы добавим буфер между операторами onEach и collect, он создаст отдельную сопрограмму для параллельного сбора элементов из оператора onEach в соответствии с предоставленной емкостью буфера. Это позволит нам собрать все элементы за 300 мс, так как все операторские операции не должны выполняться последовательно.

flowOf("A", "B", "C")
    .onEach  { delay(300) } // Output takes 900ms as all items are processed sequentially

flowOf("A", "B", "C")
    .onEach  { delay(300) }
    .buffer(3) // Output takes 300ms as all items are processed in parallel at buffer operator
               // and delivered to collect immediately.

onEmpty

Вызывает действие, если поток завершается без испускания элементов.

val nameFlow: Flow<String> = flowOf()
nameFlow.onEmpty {
    emit("Bob")
    emit("Alice")
} // Bob, Alice

conflate

В случае нескольких вызовов emit равно обработает все, но соберет только последний вызов.

StateFlow

StateFlow

Hot flow. Это специализация SharedFlow. Доставляет последнее событие только новым подписчикам
• есть возможность синхронно получить value.
• сравнивает предыдущее значение с новым. игнорирует если одинаковые.

MutableStateFlow

Мутабельный StateFlow.

asStateFlow

Конвертирует мутабельный MutableStateFlow в иммутабельный StateFlow (только для чтения).

stateIn

Преобразует холодный поток в горячий. Возвращает оператор StateFlow. Полезен в ситуациях, когда имеется холодный поток, предоставляющий обновления значения некоторого состояния, создание и/или обслуживание которого требует больших затрат, но имеется несколько подписчиков, которым необходимо собирать самое последнее значение состояния.

val stateFlow: StateFlow<Int> = flow
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5_000),
        initialValue = 0
    )

update

Метод для обновления данных в MutableStateFlow.

data class Person(
    val name: String
)

private val personFlow = MutableStateFlow(Person("John"))

personFlow.update {
    it.copy(name = "Vlad")
}

collectAsState

Используется в Compose. Предоставляет значение этого StateFlow через State. Начальное значение - StateFlow.value. Каждый раз когда в StateFlow будет отправляться новое значение будет вызываться рекомпозиция.

private val _numberFlow: MutableStateFlow<Int> = MutableStateFlow(42)
val numberFlow: StateFlow<Int> = _numberFlow.asStateFlow()

val number: Int by viewModel.numberFlow.collectAsState()




collectAsStateWithLifecycle

Предоставляет значение этого StateFlow через State учитывая жизненный цикл. Каждый раз когда в StateFlow будет отправляться новое значение будет вызываться рекомпозиция если жизненный цикл lifecycleOwner равен хотя бы minActiveState (Lifecycle.State.STARTED). Сбор данных останавливается если жизненный цикл падает ниже минимального и перезапускается если наоборот.

private val _numberFlow: MutableStateFlow<Int> = MutableStateFlow(42)
val numberFlow: StateFlow<Int> = _numberFlow.asStateFlow()

val number: Int by viewModel.numberFlow.collectAsStateWithLifecycle()
SharedFlow

SharedFlow

Hot flow. Генерирует события, даже если вы не вызываете collect на нем
• нет возможность синхронно получить value.
• всегда отправляет новые значения, даже если они одинаковые.
• может иметь несколько подписчиков.

MutableSharedFlow

Мутабельный SharedFlow.

asSharedFlow

Конвертирует MutableSharedFlow в иммутабельный SharedFlow (только для чтения).

shareIn

Преобразует холодный поток в горячий. Возвращает оператор SharedFlow.

val sharedFlow: SharedFlow<Int> = flow
    .shareIn(
        scope = viewModelScope,
        started = SharingStarted.Lazily,
        replay = 0
    )
Kotlin Coroutines Flow. Вопросы на собесе
  1. Как можно создавать Flow и какие есть билдеры?
  1. Как обрабатывать ошибки в Flow?
  1. Как работает оператор flowOn?
  1. Как работает оператор collect?
  1. Разница между StateFlow и SharedFlow?

    StateFlow хранит текущее значение и всегда возвращает последнее значение новым подписчикам, обеспечивая сохранение состояния.

    SharedFlow не сохраняет состояние, а просто транслирует эмитированные значения всем подписчикам, без сохранения их в истории.

  1. Разница между холодным и горячим потоками в Kotlin?

    Холодный поток в Kotlin создается с помощью Flow, который начинает эмитировать данные только при подписке. Горячий поток реализуется с помощью SharedFlow или StateFlow, который продолжает эмитировать данные независимо от подписчиков.