专业编程基础技术教程

网站首页 > 基础教程 正文

Scala函数式编程:15 流处理和增量 I/O

ccvgpt 2024-10-12 13:42:59 基础教程 6 ℃


本章涵盖

Scala函数式编程:15 流处理和增量 I/O

  • 检查命令式 I/O 的限制
  • 涵盖流转换的表达方法
  • 引入有效的流计算
  • 解释资源安全

我们在第 4 部分的引言中提到,函数式编程是一个完整的范式。每个可以想象的程序都可以在功能上表达,包括与外部世界交互的程序,但如果IO类型是构建此类程序的唯一方法,那将是令人失望的。IO和ST的工作原理是简单地将命令式编程语言嵌入到Scala的纯功能子集中。在 IO monad 中编程时,我们必须对程序进行推理,就像我们在普通的命令式编程中一样。

我们可以做得更好。在本章中,我们将展示如何恢复本书第1-3部分中开发的高级组合风格,即使是与外部世界交互的程序。这个领域的设计空间是巨大的,我们的目标不是完全探索它,而是传达想法,让你了解什么是可能的。我们将构建一个类似于 Functional Streams for Scala (FS2) 库 (https://fs2.io) 的组合流 API。

15.1 命令式 I/O 的问题:一个例子

我们将首先考虑一个简单的具体使用场景,我们将使用它来强调嵌入在 IO monad 中的命令式 I/O 的一些问题。本章的第一个挑战是编写一个程序来检查文件中的行数是否大于 40,000。

这是一个刻意简单的任务,说明了我们的库要解决的问题的本质。我们当然可以使用 IO monad 中的普通命令式代码来完成此任务。我们先来看一下。

示例 15.1 命令式计算行号

def linesGt40k(filename: String): IO[Boolean] = IO:
  val src = io.Source.fromFile(filename)             ①
  try
    var count = 0
    val lines: Iterator[String] = src.getLines()     ②
    while count <= 40000 && lines.hasNext do
      lines.next                                     ③
      count += 1
    count > 40000
  finally src.close

(1) scala.io.Source 具有方便的功能,可以从外部源(如文件)读取。

(2) 从源中获取有状态迭代器。

(3)有前进到下一个元素的副作用

然后我们可以使用 linesGt40k(“lines.txt”).unsafeRunSync() 运行此 IO 操作,其中 unsafeRunSync 是一种副作用方法,它采用 IO[A],返回 A 并实际执行所需的效果(参见第 13.7.1 节)。

虽然这段代码使用低级原语,比如 while 循环、可变迭代器和 var,但它有一些好处。首先,它是增量的 - 整个文件不会预先加载到内存中。相反,仅在需要时从文件中获取行。如果我们不缓冲输入,我们一次可以在内存中保留一行文件。它也会提前终止——一旦知道答案。

这段代码也有一些不好的地方。首先,我们必须记住在完成后关闭文件。这似乎很明显,但是如果我们忘记这样做,或者(更常见的是)如果我们在 finally 块之外关闭文件并且首先发生异常,该文件将保持打开状态。1 这称为资源泄漏。文件句柄是稀缺资源的一个示例,因为操作系统在任何给定时间只能打开有限数量的文件。如果此任务是较大程序的一部分(假设我们正在递归扫描整个目录,建立超过 40,000 行的所有文件的列表),则较大的程序很容易失败,因为打开的文件太多。

我们希望编写资源安全的程序。它们应在不再需要文件句柄时立即关闭它们(无论是由于正常终止还是异常);他们不应尝试从关闭的文件中读取;它们应该对其他资源(如网络套接字、数据库连接等)执行相同的操作。直接使用 IO 可能会有问题,因为这意味着我们的程序完全负责确保自己的资源安全,而我们没有得到编译器的帮助来确保他们这样做。如果我们的图书馆能够通过构建确保程序的资源安全,那就太好了。

但即使抛开资源安全的问题,这段代码也有一些不令人满意的地方。它将高级算法与有关迭代和文件访问的低级问题纠缠在一起。当然,我们必须从某个资源中获取元素,处理发生的任何错误,并在完成后关闭资源,但是我们的程序与任何这些事情无关。相反,它是关于计算元素并在我们达到 40,000 时立即返回一个值,这发生在所有这些 I/O 操作之间。将算法和 I/O 问题交织在一起不仅丑陋,而且是组合的障碍,我们的代码以后很难扩展。若要了解这一点,请考虑原始方案的一些变体:

  • 检查文件中非空行数是否超过40,000。
  • 查找 40,000 之前的行索引,其中连续行的首字母拼写出“abracadabra”。

对于第一种情况,我们可以想象将一个 String => 布尔值传递到我们的 linesGt40k 函数中。但是对于第二种情况,我们需要修改我们的循环以跟踪一些进一步的状态。除了更丑陋之外,生成的代码可能很难正确。通常,在 IO monad 中编写高效代码意味着编写整体循环,而整体循环不可组合。

让我们将此与我们正在分析的行的 LazyList[String] 的情况进行比较:

lines.zipWithIndex.exists(_(1) + 1 >= 40000)

好多了!使用 LazyList ,我们可以从预先存在的组合器组装我们的程序, zipWithIndex 并且存在 .如果我们只想考虑非空行,那么我们可以很容易地使用 filter :

lines.filter(!_.trim.isEmpty).zipWithIndex.exists(_(1) + 1 >= 40000)

对于第二种情况,我们可以将 LazyList 2 上定义的 indexOfSlice 函数与 take(在 40,000 行后终止搜索)和 map(提取每行的第一个字符)结合使用:

lines
  .filter(!_.trim.isEmpty)
  .take(40000)
  .map(_.head)
  .indexOfSlice("abracadabra".toList)

我们希望在从实际文件中读取时编写类似于前面的内容。问题是我们没有 懒惰列表[字符串] ;我们有一个可以从中读取的文件。我们可以通过编写一个名为 lines 的函数来作弊,该函数返回 IO[LazyList[String]] :

def lines(filename: String): IO[LazyList[String]] = IO:
  val src = io.Source.fromFile(filename)
  src.getLines().to(LazyList).append {
    src.close
    LazyList.empty
  }

这称为延迟 I/O。我们在作弊,因为 IO monad 中的 LazyList[String] 实际上并不是一个纯值。当流的元素被强制时,它将执行从文件读取的副作用,只有当我们检查整个流并到达其末尾时,我们才会关闭文件。虽然惰性 I/O 很有吸引力,因为它让我们在一定程度上恢复了组合风格,但它存在问题,原因如下:

  • 它不是资源安全的。仅当我们遍历到流的末尾时,才会释放资源(在本例中为文件)。但是我们经常希望提前终止遍历(在这里,一旦找到匹配项,存在就会停止遍历 LazyList),我们当然不希望每次这样做时都泄漏资源。
  • 没有什么能阻止我们穿越同样的东西关闭文件后再次LazyList。这将导致以下两种情况之一,具体取决于 LazyList 是否在强制后记住(缓存)其元素。如果它们被记住,我们将看到过多的内存使用,因为所有元素都将保留在内存中。如果未记住它们,则再次遍历流将导致从关闭的文件句柄读取。
  • 由于强制流元素具有 I/O 副作用,因此两个线程遍历LazyList 同时可能导致不可预知的行为。
  • 在更现实的场景中,我们不一定完全了解懒惰列表[字符串] .它可以传递给一些我们无法控制的函数,这些函数可能会在检查它之前将其存储在数据结构中很长时间。现在,正确的用法需要一些带外知识:我们不能像操纵典型的纯值一样操作这个 LazyList[String]——我们必须了解它的起源。这对组合不利,在组合中,除了其类型之外,我们不必知道任何关于值的信息。

15.2 简单流转换

在执行 I/O 时,我们从 LazyList 和 List 中恢复我们习惯的高级样式的第一步是引入流处理器的概念。流处理器指定从一个流到另一个流的转换。我们在这里通常使用术语来指代一个序列,可能是由外部源延迟生成或提供的。这可以是文件中的行流、HTTP 请求、鼠标单击位置或其他任何内容。让我们考虑一个简单的数据类型, 拉动 ,它允许我们表达流转换。3

示例 15.2 拉取数据类型

enum Pull[+O, +R]:
  case Result[+R](result: R) extends Pull[Nothing, R]
  case Output[+O](value: O) extends Pull[O, Unit]
  case FlatMap[X, +O, +R](
    source: Pull[O, X],
    f: X => Pull[O, R]) extends Pull[O, R]
 
  def step: Either[R, (O, Pull[O, R])] = this match            ①
    case Result(r) => Left(r)
    case Output(o) => Right(o, Pull.done)
    case FlatMap(source, f) =>
      source match
        case FlatMap(s2, g) =>
          s2.flatMap(x => g(x).flatMap(y => f(y))).step
        case other => other.step match
          case Left(r) => f(r).step
          case Right((hd, tl)) => Right((hd, tl.flatMap(f)))
 
  @annotation.tailrec
  final def fold[A](init: A)(f: (A, O) => A): (R, A) =         ②
    step match
      case Left(r) => (r, init)
      case Right((hd, tl)) => tl.fold(f(init, hd))(f)
 
  def toList: List[O] =                                        ③
    fold(List.newBuilder[O])((bldr, o) => bldr += o)(1).result
 
  def flatMap[O2 >: O, R2](
    f: R => Pull[O2, R2]
  ): Pull[O2, R2] =                                            ④
    Pull.FlatMap(this, f)
 
  def >>[O2 >: O, R2](next: => Pull[O2, R2]): Pull[O2, R2] =
    flatMap(_ => next)
 
  def map[R2](f: R => R2]): Pull[O, R2] =
    flatMap(r => Result(f(r)))

(1) 在重写左嵌套平面映射调用时解释拉取

(2) 逐步拉动,直到产生最终结果,累积输出值

(3) 折叠拉取,将所有输出元素收集到单个列表中,并丢弃 R 值

(4) 操作拉动的最终结果值,而不是每个输出值

Pull[O, R] 输出任意数量的 O 类型的值,然后以 R 类型的单个值终止。拉取可以通过步骤进行增量评估,输出最终值R或输出值O和代表输入流其余部分的新拉取。有了这个模型,我们可以通过使用flatMap对Pull.Output 调用进行排序,从各种数据源构建任意复杂的拉取。当我们想要运行最终的流转换时,我们可以使用 fold ,它反复步进拉取直到终止。

特别值得注意的是平面映射操作。flatMap 不是将每个输出值转换为新的拉取值并连接所有结果,而是对类型 R 的最终结果值进行操作。这意味着我们可以使用派生的 >> 方法连接两个拉取的值。我们一会儿再看这个。

拉取类型在输出类型和结果类型中都是协变的。这种协方差允许我们将 Result[+R] 定义为拉动[无,R],将输出 [+O] 定义为拉动[O,单位]。这些子类型关系类似于 None 是一个选项[Nothing],Nil 是一个列表 [Nothing]。因此,我们得到了一个更好地与类型推断配合使用的 API。稍后我们将更深入地了解类型推断。

15.2.1 创建拉取

我们可以创建的最简单的拉取之一是不输出任何值并以单位终止的拉取:

val done: Pull[Nothing, Unit] = Result(())

我们可以构造输出单个值并连接它们的拉取:

scala> val p = Pull.Output(1) >> Pull.Output(2)
val p: Pull[Int, Unit] = FlatMap(Output(1),...)
 
scala> val q = p.toList
val q: List[Int] = List(1, 2)

p1 >> p2 操作是编写 p1.flatMap(_ => p2) 的简写。

我们可以将递归与 done 和 >> 一起从列表和惰性列表中构造拉取:

def fromList[O](os: List[O]): Pull[O, Unit] =
  os match
    case Nil => done
    case hd :: tl => Output(hd) >> fromList(tl)
 
def fromLazyList[O](os: LazyList[O]): Pull[O, Unit] =
  os match
    case LazyList() => done
    case hd #:: tl => Output(hd) >> fromLazyList(tl)

在fromList和fromLazyList中,我们使用递归构造一个拉取。我们将每个输入元素转换为输出拉取,然后使用 >> 延迟递归。这是堆栈安全的,因为>>懒惰地接受它唯一的参数。这类似于我们在第 5 章中使用惰性求值来构造惰性列表的方式,但这里我们依靠 flatMap 的惰性来延迟尾部的求值。让我们将此递归模式推广到更高级别的拉取构造函数中:

def unfold[O, R](init: R)(f: R => Either[R, (O, R)]): Pull[O, R] =
  f(init) match
    case Left(r) => Result(r)
    case Right((o, r2)) => Output(o) >> unfold(r2)(f)

展开从 R 类型的初始种子值和迭代函数开始,该函数被反复调用,产生 R 的最终结果或 O 的输出和 R 的新种子。这基本上与我们在练习 5.11 中为 LazyList 定义的操作相同。


练习 15.1

——————————————————————————————

实现 fromListViaUnfold 和 fromLazyListViaUnfold 作为 Pull 伴侣对象上的方法,使用 unfold :

def fromListViaUnfold[O](os: List[O]): Pull[O, Unit]
def fromLazyListViaUnfold[O](os: LazyList[O]): Pull[O, Unit]

就像我们对 LazyList 所做的那样,我们可以创建一个连续的构造函数:

def continually[A](a: A): Pull[A, Nothing] =
  Output(a) >> continually(a)

连续的返回类型很有趣——结果类型是 Nothing,因为拉取是无限的,因此永远不会在值中终止。我们可以再次将递归提取到更通用的组合器中:

enum Pull[+O, +R]:
  def repeat: Pull[O, R] =
    this >> repeat
 
object Pull:
  def continually[A](a: A): Pull[O, Nothing] =
    Output(a).repeat
 


练习 15.2

——————————————————————————————

将迭代定义为 Pull 伴随对象上的方法。它应该返回一个无限流,该流首先输出提供的初始值,然后输出将该值应用于提供的函数的结果,依此类推:

def iterate[O](initial: O)(f: O => O): Pull[O, Nothing]

到目前为止,我们还没有办法部分评估拉动。部分求值对于无限拉动特别有用,例如从连续、重复和迭代返回的那些。让我们在 Pull 上定义 take(n),它会评估拉取,直到生成指定数量的输出值:

def take(n: Int): Pull[O, Option[R]] =
  if n <= 0 then Result(None)
  else step match
    case Left(r) => Result(Some(r))
    case Right((hd, tl)) => Output(hd) >> tl.take(n - 1)

当 n 小于或等于零时,take 返回结果(无)。否则,我们分步拉动;如果我们已经到达了输入的末尾,那么我们返回包裹在 Some 中的源的最终结果。否则,我们得到一个输出值,我们输出,以及一个余数拉动,我们递归地称之为take,减少元素的数量。

不过,这种实现有一个微妙之处:当 n 为正时,我们急切地步进源拉取。这种急切的评估可能会令人惊讶,因为评估目前仅在折叠拉取时发生(或使用操作,例如 toList ,它是建立在折叠上的),所以让我们通过 Pull 上的新组合器延迟此评估:

def uncons: Pull[Nothing, Either[R, (O, Pull[O, R])]] =
  Pull.done >> Result(step)
 
def take(n: Int): Pull[O, Option[R]] =
  if n <= 0 then Result(None)
  else uncons.flatMap:
    case Left(r) => Result(Some(r))
    case Right((hd, tl)) => Output(hd) >> tl.take(n - 1)

uncons 操作将步骤包装在结果构造函数中,并通过 flatMap 延迟其创建。在 take 的实现中,不是匹配 step 的结果,我们现在对 uncons 的结果进行平面映射。有了这个定义,我们可以制作部分评估的无限流:

scala> val ints = Pull.iterate(0)(_ + 1)
val ints: Pull[Int, Nothing] = FlatMap(Output(0),...)
 
scala> val firstFive = ints.take(5).toList
val firstFive: List[Int] = List(0, 1, 2, 3, 4)


练习 15.3

——————————————————————————————

在拉动时实现 drop 、takeWhile 和 dropWhile :

def drop(n: Int): Pull[O, R]
def takeWhile(p: O => Boolean): Pull[O, Pull[O, R]]
def dropWhile(p: O => Boolean): Pull[Nothing, Pull[O, R]]

我们可以通过将递归函数与 uncons 相结合来构建许多有趣的拉动转换。以下是我们如何转换拉取的输出元素:

def mapOutput[O2](f: O => O2): Pull[O2, R] =
  uncons.flatMap:
    case Left(r) => Result(r)
    case Right((hd, tl)) => Output(f(hd)) >> tl.mapOutput(f)

我们可以使用相同的方法来添加或删除拉动的元素。下面是一个基于谓词过滤输出元素的组合器:

def filter(p: O => Boolean): Pull[O, R] =
  uncons.flatMap:
    case Left(r) => Result(r)
    case Right((hd, tl)) =>
      (if p(hd) then Output(hd) else Pull.done) >> tl.filter(p)

我们甚至可以通过将状态作为参数传递给递归函数来维护拉取转换中的状态:

def count: Pull[Int, R] =
  def go(total: Int, p: Pull[O, R]): Pull[Int, R] =
    p.uncons.flatMap:
      case Left(r) => Result(r)
      case Right((_, tl)) =>
        val newTotal = total + 1
        Output(newTotal) >> go(newTotal, tl)
  Output(0) >> go(0, this)


练习 15.4

——————————————————————————————

在 Pull 上实现一个方法,该方法输出所有输入的运行计数,使用幺半群组合元素。对于每个输入元素,将其与累积计数相结合,输出结果:

def tally[O2 >: O](using m: Monoid[O2]): Pull[O2, R]
 
scala> val t = ints.tally.take(10).toList
val t: List[Int] = List(0, 0, 1, 3, 6, 10, 15, 21, 28, 36)


练习 15.5

——————————————————————————————

在 Pull[Int, R] 上实现一个扩展方法,该方法将最后 n 个值的平均值输出为 Pull[Double, R] :

extension [R](self: Pull[Int, R])
  def slidingMean(n: Int): Pull[Double, R]

正如我们在本书中多次看到的那样,当我们在定义一系列函数时注意到常见的模式时,我们可以将这些模式分解到通用组合器中。计数、计数和滑动均值函数都有一个共同的模式:每个函数都有一个状态片段,有一个状态转换函数,该函数根据输入更新此状态,并产生单个输出。我们可以将其推广到组合器,我们称之为mapAccumulate:

def mapAccumulate[S, O2](init: S)(f: (S, O) => (S, O2)): Pull[O2, (S, R)] =
  uncons.flatMap:
    case Left(r) => Result((init, r))
    case Right((hd, tl)) =>
      val (s, out) = f(init, hd)
      Output(out) >> tl.mapAccumulate(s)(f)


练习 15.6

——————————————————————————————

重新实现计数、计数和滑动平均值,使用 mapAccumulate 而不是显式递归。


拉莫纳德

flatMap 操作与结果构造函数一起构成了 [x] =>> Pull[O, x] 的 monad 实例:

given [O]: Monad[[x] =>> Pull[O, x]] with
  def unit[A](a: => A): Pull[O, A] = Result(a)
  extension [A](pa: Pull[O, A])
    def flatMap[B](f: A => Pull[O, B]): Pull[O, B] =
      pa.flatMap(f)

我们可以通过将输出作为单元的实现并提出不同的 flatMap 实现来创建替代的 monad 实例。输出(a)给了我们一个拉[A,单位]——结果类型是单位——所以让我们定义一个Monad[[x] =>> Pull[x, Unit]]。通过将此类型插入到 Monad 的定义并遵循这些类型,我们发现我们需要定义一个具有以下签名的 flatMap 版本:

extension [O](self: Pull[O, Unit])
  def flatMap[O2](f: O => Pull[O2, Unit]): Pull[O2, Unit]

请注意,传递给 flatMap 的函数从原始 Pull 获取输出。这表明我们的 flatMap 实现将类似于 List 和 LazyList 的 monads,其中在每个输出元素上调用提供的函数,并且结果都是连接的。由于我们已经在 Pull 上有一个 flatMap 方法,让我们将此扩展重命名为 flatMapOutput 并实现此 map-then-concat 行为:

extension [O](self: Pull[O, Unit])
  def flatMapOutput[O2](f: O => Pull[O2, Unit]): Pull[O2, Unit] =
    self.uncons.flatMap:
      case Left(()) => Result(())
      case Right((hd, tl)) =>
        f(hd) >> tl.flatMapOutput(f)

我们取消原始拉取,如果我们收到一个输出元素和余数,那么我们将输出元素应用于提供的函数以创建一个 Pull[O2, Unit]。我们返回该拉取,然后在其余部分调用flatMapOutput。此实现具有所需的映射然后连接行为:

scala> val out = ints.flatMapOutput(i =>
                        Pull.fromList(List(i, i))).take(10).toList
val out: List[Int] = List(0, 0, 1, 1, 2, 2, 3, 3, 4, 4)

鉴于此实现,我们可以提供替代的 monad 实例:

val outputMonad: Monad[[x] =>> Pull[x, Unit]] = new:
  def unit[A](a: => A): Pull[A, Unit] = Output(a)
  extension [A](pa: Pull[A, Unit])
    def flatMap[B](f: A => Pull[B, Unit]): Pull[B, Unit] =
      pa.flatMapOutput(f)

这是一个合法的 monad 实例,但使用起来可能很尴尬。Monad[[x] =>> Pull[O, x]] 是一个给定的实例,因此要使用 Monad[[x] =>> Pull[x, Unit]] 实例,我们需要显式引用该实例或以其他方式将其置于比常规实例更高的优先级范围内:

scala> val p = Pull.Result(true).void                 ①
val p: Pull[Nothing, Unit] = ...
 
scala> val q = Pull.Output(1).void                    ②
val q: Pull[Int, Unit] = ...
 
scala> val r = Pull.outputMonad.void(Pull.Output(1))  ③
val r: Pull[Unit, Unit] = ...

(1)void是Monad的一种扩展方法。

(2) 使用 monad[[x] =>> pull[O, x]]

(3) 使用 monad[[x] =>> 拉[x, 单位]]

相反,我们可以为 Pull[O, Unit] 定义一个不透明的类型别名,并将 map-then-concat 行为与该新类型相关联。我们称这种新类型为 Stream ,因为它将拉取的结果固定为 Unit,因此将焦点转移到输出值上:

opaque type Stream[+O] = Pull[O, Unit]
 
object Stream:
  def apply[O](os: O*): Stream[O] =
    Pull.fromList(os.toList).toStream
 
  extension [O](self: Stream[O])
    def toPull: Pull[O, Unit] = self     ①
 
    def fold[A](init: A)(f: (A, O) => A): A =
      self.fold(init)(f)(1)
 
    def toList: List[O] =
      self.toList
 
    def take(n: Int): Stream[O] =
      self.take(n).void
 
    def ++(that: => Stream[O]): Stream[O] =
      self >> that
 
  given Monad[Stream] with
    def unit[A](a: => A): Stream[A] = Pull.Output(a)
    extension [A](sa: Stream[A])
      def flatMap[B](f: A => Stream[B]): Stream[B] =
        sa.flatMapOutput(f)
 
object Pull:
  extension [O](self: Pull[O, Unit])
    def toStream: Stream[O] = self       ②

(1) 将流 [O] 转换为拉动 [O, 单位]

(2) 将拉[O, 单位] 转换为流[O]

除了不透明类型定义和 monad 实例之外,我们还提供了一些额外的 API。toPull 和 toStream 方法允许我们在 流和拉 .我们还提供了一些 API,用于直接处理流,委托给 Pull 上的相应操作,但在必要时调整返回类型。例如,对 Stream 的 take 操作返回一个 Stream[O],丢弃 Pull 上 take 提供的 Option[R] 结果。另一个有趣的例子是 ++ 操作,它使用底层拉取的 monadic 排序连接两个流。

我们是否需要拉取和流?Pull 比 Stream 更强大(Stream 将结果类型固定为 Unit 这一事实暗示了这一点),那么为什么不坚持使用 Pull 而不提供有问题的 monad 实例呢?流通常是更方便的类型,将流计算建模为像LazyList这样的集合,而Pull擅长根据uncons和flatMap定义自定义流计算。在设计 Pull and Stream 的 API 时,我们可以利用这些优势 — 将高级、类似集合的组合器直接添加到 Stream 中,同时添加简化向 Pull 构建新流计算的操作。

15.2.2 组合流转换

Pull 类型允许我们用 monadic 递归来描述任意复杂的数据源,而 Stream 类型为我们提供了一个类似于 API 的集合,该集合对拉取输出的各个元素进行操作。到目前为止,我们定义的各种流转换都是 拉取或流 .我们希望能够定义独立于流本身的流转换,并将多个流转换组合成单个值。我们可以通过函数组合来实现这一点!

type Pipe[-I, +O] = Stream[I] => Stream[O]

管道 [I, O] 是从类型 I 的输入值流到 O 类型的输出值流的函数的类型别名。请注意,Pipe 不会添加任何表现力 — 它只是函数的类型别名。为什么要给它起个名字?这样做可以鼓励我们流库的用户从独立的流转换的角度进行思考。让我们看几个例子:

val nonEmpty: Pipe[String, String] =
  _.filter(_.nonEmpty)
 
val lowerCase: Pipe[String, String] =
  _.map(_.toLowerCase)

非空管道将 Stream[String] 转换为 Stream[String],并删除所有空字符串,小写管道将每个输入字符串转换为小写。在这两种情况下,大部分转换工作都是由高级流组合器(分别为过滤器和映射)完成的。将这些操作提取到值中,我们可以独立地描述它们。此外,我们能够使用函数组合来组合这些值:

val normalize: Pipe[String, String] =
  nonEmpty andThen lowerCase

在这里,我们使用 Scala 函数类型的内置 andThen 操作将这两个转换组合成更高级别的管道。

由于 Pipe 是函数的类型别名,因此我们可以通过直接调用这些函数将管道应用于流。或者,我们可以使用 scala.util.chaining.pipe 实用程序方法,它允许我们将 f(a) 写为 a.pipe(f) .以下内容都是等效的:

val lines: Stream[String] = Stream("Hello", "", "World!")
 
val normalized: Stream[String] = normalize(lines)
 
import scala.util.chaining.scalaUtilChainingOps         ①
val normalized2: Stream[String] = lines.pipe(normalize)
val normalized3: Stream[String] = lines.pipe(nonEmpty).pipe(lowerCase)

(1)提供管道延伸法


练习 15.7

——————————————————————————————

实现存在 。有多种方法可以实现它,因为 exist(_ % 2 == 0)(Stream(1, 3, 5, 6, 7)) 可以生成 Stream(true)(停止并仅产生最终结果)、Stream(false、false、false、true)(停止并产生所有中间结果)或 Stream(false、false、false、true、true)(停止并产生所有中间结果):

def exists[I](f: I => Boolean): Pipe[I, Boolean]

We can now express the core transformation for our line-counting problem as count andThen exists(_ > 40000) , and it’s easy to attach filters and other transformations to our pipeline.

15.2.3 Processing files

Our original problem of answering whether a file has more than 40,000 elements is now easy to solve—but so far we’ve just been transforming pure streams. Luckily, we can just as easily use a file as the source of elements in a Stream , and instead of generating a Stream as the result, we can combine all the outputs of the stream into a single final value.

Listing 15.3 Using Stream with files instead of LazyList

def fromIterator[O](itr: Iterator[O]): Stream[O] =
  Pull.unfold(itr)(itr =>
    if itr.hasNext then Right((itr.next(), itr))
    else Left(itr)
  ).void.toStream
 
def processFile[A](
  file: java.io.File,
  p: Pipe[String, A],
)(using m: Monoid[A]): IO[A] = IO:
  val source = scala.io.Source.fromFile(file)
  try fromIterator(source.getLines).pipe(p).fold(m.empty)(m.combine)
  finally source.close()

processFile 函数打开文件并使用 fromIterator 创建一个表示文件行的 Stream[String]。然后,它将提供的管道应用于该流以获取流[A],然后通过使用Monoid[A]折叠输出将该流[A]减少为单个A。整个计算包装在 IO 中,以确保进程文件保持引用透明。现在,我们可以通过以下方式解决原始问题:

def checkFileForGt40K(file: java.io.File): IO[Boolean] =
  processFile(file, count andThen exists(_ > 40000))(using Monoid.booleanOr)


练习 15.8

——————————————————————————————

编写一个程序,将华氏度读取为文件中的双精度值,每行一个值,通过管道发送每个值以将其转换为华氏度,然后将结果写入另一个文件。程序应忽略输入文件中的空白行以及以 # 字符开头的行。您可以使用函数来摄氏:

def toCelsius(fahrenheit: Double): Double =
  (5.0 / 9.0) * (fahrenheit - 32.0)
 
def convert(inputFile: String, outputFile: String): IO[Unit]

15.3 可扩展拉取和流

我们的拉取和流类型提供了一种描述延迟计算的替代方法,但它们并不比 LazyList 更具表现力。事实上,如果我们将 Pipe[I, O] 定义为 LazyList[I] => LazyList[O] 的别名,我们本可以用类似的方式编写文件转换程序。与其依赖隐藏流的有效源的整体 IO 包装器,我们可以扩充拉取和流的定义以支持对任意效果的评估。让我们参数化“拉取”和“流”,使其能够评估效果以生成结果值或输出值。为此,我们将向 Pull 添加一个额外的数据构造函数:

enum Pull[+F[_], +O, +R]:
  case Result[+R](result: R) extends Pull[Nothing, Nothing, R]
  case Output[+O](value: O) extends Pull[Nothing, O, Unit]
  case Eval[+F[_], R](action: F[R]) extends Pull[F, Nothing, R]
  case FlatMap[+F[_], X, +O, +R](
    source: Pull[F, O, X], f: X => Pull[F, O, R]) extends Pull[F, O, R]

我们添加了一个新的 Eval 数据构造函数,它包装了 F[R] 类型的操作。Eval[F, R] 扩展了 Pull[F, Nothing, R],表示它不输出任何值(因此可以在我们需要 Pull[F, O, R] 的任何地方使用)。我们还将类型参数 +F[_] 添加到 拉取 。该参数是协变的,它允许结果和输出将效应类型固定为无,表示它们不评估任何效应。

拉动上的差异注释

我们选择将 Pull 的所有类型参数定义为协变,这为我们提供了更好的类型推断和更精确的类型。我们可以像这样将每个参数定义为不变:

enum Pull[F[_], O, R]:
  case Result(result: R)
  case Output(value: O)
  case Eval(action: F[R])
  case FlatMap[F[_], X, O, R](
    source: Pull[F, O, X], f: X => Pull[F, O, R]) extends Pull[F, O, R]

这个版本看起来肯定简单得多!不幸的是,它使用起来并不好。请考虑以下示例:

scala> Pull.Output(1)
val res0: Pull[Nothing, Int, Nothing] = Output(1)
 
scala> Pull.Result(true)
val res1: Pull[Nothing, Nothing, Boolean] = Result(true)

到目前为止,这些类型看起来与协变定义中的相同。当我们尝试组合这些值时,就会出现问题。例如,我们应该能够编写 res0 >> res1 ,但这样做会导致以下错误:

scala> res0 >> res1
-- [E007] Type Mismatch Error: -----------------------------------------
1 |res0 >> res1
  |        ^^^^
  |        Found:    (res1 : Pull[Nothing, Nothing, Boolean])
  |        Required: Pull[Nothing, Int, Nothing]

在此示例中,res0 具有固定的 F = 无,O = 整数,R = 无,并且不能与固定的拉取统一 F = 无,O = 无,R = 布尔值。但是,我们可以通过推迟具体类型的应用来避免这种情况:

scala> Pull.Output(1) >> Pull.Result(true)
val res2: Pull[Nothing, Int, Boolean] = Result(true)

在这种情况下,Scala 能够在键入组合表达式时推断出 O = Int 和 R = 布尔值。

带有方差注释的拉取版本不会遇到此问题。总的来说,方差注释给 Pull 的定义增加了很多噪音,正如我们很快就会看到的,Pull 上的方法。但是生成的 API 使用起来要好得多。

让我们看看添加效果参数如何影响 Pull 上各种操作的定义。首先,让我们看一下步骤:前面的定义返回了一个 Both[R, (O, Pull[O, R])]。我们需要修改返回类型以考虑步骤遇到 Eval 节点的情况。我们可以映射在包装的 F[R] 上以将其转换为 Both,将类型 R 的结果放在左侧。这表明该步骤现在需要返回 F[Both[R, (O, Pull[O, R])]。作为这种变化的结果,当我们递归调用 step 时,我们需要用 flatMap 对我们的计算进行排序。但是,我们如何在抽象类型构造函数 F 上进行映射和平面映射?我们可以要求莫纳德[F]:

def step[F2[x] >: F[x], O2 >: O, R2 >: R](
  using F: Monad[F2]                                     ①
): F2[Either[R2, (O2, Pull[F2, O2, R2])]] =
  this match
    case Result(r) => F.unit(Left(r))                    ②
    case Output(o) => F.unit(Right(o, Pull.done))
    case Eval(action) => action.map(Left(_))             ③
    case FlatMap(source, f) =>
      source match
        case FlatMap(s2, g) =>
          s2.flatMap(x => g(x).flatMap(y => f(y))).step
        case other => other.step.flatMap:                ④
          case Left(r) => f(r).step
          case Right((hd, tl)) => F.unit(Right((hd, tl.flatMap(f))))

(1) 效果类型的单体实例

(2)通过单元将纯结果提升为效果类型。

(3) 将动作结果包装在左边。

(4)递归步进,使用flatMap访问第一步的结果。

添加 Eval 构造函数的简单更改对 step 产生了深远的影响!我们为效应类型选取了一个 Monad 约束,并使用了一元递归而不是常规递归来进行序列计算。因此,仅当效果类型具有堆栈安全的flatMap时,step才是堆栈安全的(由于other.step.flatMap(...)调用)。

步骤的签名也变得更加复杂。我们选取了三个类型参数,其边界表明它们是 F、O 和 R 的超类型。如果没有这些类型参数,Scala 抱怨我们试图在不变位置使用协变参数。就像我们在前面的章节中所做的那样,我们可以通过引入具有超类型约束的类型参数来机械地将这些错误转换为工作定义,尽管这样做会变得非常复杂。或者,我们可以将 step 定义为扩展方法,避免需要新的类型参数:

def step(using F: Monad[F]): F[Either[R, (O, Pull[F, O, R])]] =
  self match
    case Result(r) => F.unit(Left(r))
    case Output(o) => F.unit(Right(o, Pull.done))
    case Eval(action) => action.map(Left(_))
    case FlatMap(source, f) =>
      source match
        case FlatMap(s2, g) =>
          s2.flatMap(x => g(x).flatMap(y => f(y))).step
        case other => other.step.flatMap:
          case Left(r) => f(r).step
          case Right((hd, tl)) => F.unit(Right((hd, tl.flatMap(f))))

我们将在本章中使用方法,但如果类型约束变得不堪重负,请随意使用扩展方法定义操作。这样做通常更简单,以换取某种类型推理的损失。

fold 和 toList 方法需要类似的处理,因为 fold 调用递归地步进,toList 调用折叠:

def fold[F2[x] >: F[x], R2 >: R, A](init: A)(f: (A, O) => A)(
  using F: Monad[F2]
): F2[(R2, A)] =
  step.flatMap:
    case Left(r) => F.unit((r, init))
    case Right((hd, tl)) => tl.fold(f(init, hd))(f)
 
def toList[F2[x] >: F[x]: Monad, O2 >: O]: F2[List[O2]] =
  fold(List.newBuilder[O])((bldr, o) => bldr += o).map(_(1).result)

Fold 不能再用 tailrec 注释进行注释,因为它现在使用 monadic 递归而不是常规递归。同样,我们依靠效果monad的平面图来确保堆栈安全。

uncons操作更有趣。回想一下之前的定义:

def uncons: Pull[Nothing, Either[R, (O, Pull[O, R])]] =
  Pull.done >> Result(step)

我们可以通过创建一个包装对步骤的调用的 Eval 节点来使其适应有效的拉取版本:

def uncons[F2[x] >: F[x], O2 >: O, R2 >: R](
  using Monad[F2]
): Pull[F2, Nothing, Either[R2, (O2, Pull[F2, O2, R2])]] =
  Eval(step)

由于我们调用了 step,我们被迫在效果类型上添加一个 monad 约束,这样做意味着我们不能再在无效拉取(即 F = Nothing 的拉动)上调用 uncons,因为没有 Monad[Nothing] 实例。类似地,每个使用uncons的操作,如take,也需要携带Monad[F]约束。

为了解决这个问题,我们可以推迟对步骤的调用,直到评估结果拉取。让我们创建一个新的 Uncons 数据构造函数,用于存储对源拉取的引用。然后让我们增加 step 的定义来处理这个构造函数:

case Uncons(source: Pull[F, O, R])
  extends Pull[F, Nothing, Either[R, (O, Pull[F, O, R])]]
 
def step[F2[x] >: F[x], O2 >: O, R2 >: R](
  using F: Monad[F2]
): F2[Either[R2, (O2, Pull[F2, O2, R2])]] =
  this match
    case Result(r) => F.unit(Left(r))
    case Output(o) => F.unit(Right(o, Pull.done))
    case Eval(action) => action.map(Left(_))
    case Uncons(source) =>
      source.step.map(s => Left(s.asInstanceOf[R2]))
    case FlatMap(source, f) =>
      source match
        case FlatMap(s2, g) =>
          s2.flatMap(x => g(x).flatMap(y => f(y))).step
        case other => other.step.flatMap:
          case Left(r) => f(r).step
          case Right((hd, tl)) => F.unit(Right((hd, tl.flatMap(f))))

Uncons 的定义很有趣——它扩展了 Pull[F, Nothing, Both[R, (O, Pull[F, O, R])]] 并引用了底层的 Pull[F, O, R]。该类型表示,取消 Pull[F, O, R] 会导致拉取,该拉取可以评估 F 中的效果,不输出任何元素,并导致源拉取的最终值 R 或输出值 O 和余数拉动 [F, O, R]。

当我们在实现 step 时遇到 Uncons 节点时,我们步进源拉取并将其结果包装在 Left 中。我们需要一个 Monad[F2] 实例来执行此操作,并且 step 已经在范围内具有该实例。

将 Pull port 上的其余操作转换为有效版本,几乎没有变化 — 通常只需要进行调整即可传递效果的类型参数。有关更多示例,请参阅章节代码。5 溪流和管道怎么样?

opaque type Stream[+F[_], +O] = Pull[F, O, Unit]

与 Pull 一样,Stream 为效果类型选取一个新的协变类型参数。为 Stream 定义的各种构造函数和方法都需要修改以考虑这个额外的类型参数,但由于所有繁重的工作都是由 Pull 执行的,这是一个完全机械的转换:

object Stream:
  def empty: Stream[Nothing, Nothing] = Pull.done        ①
 
  def apply[O](os: O*): Stream[Nothing, O] =             ②
    fromList(os.toList)
 
  def fromList[O](os: List[O]): Stream[Nothing, O] =
    os match
      case Nil => Pull.done
      case hd :: tl => Pull.Output(hd) >> fromList(tl)
 
  extension [F[_], O](self: Stream[F, O])
    def toPull: Pull[F, O, Unit] = self
 
    def fold[A](
      init: A)(f: (A, O) => A)(using Monad[F]): F[A] =   ③
      self.fold(init)(f).map(_(1))
 
    def toList(using Monad[F]): F[List[O]] =             ④
      self.toList

(1) 返回一个流[无,无],指示既没有效果评估也没有输出

(2) 返回一个 Stream[Nothing, O],表示没有效果评估

(3) 返回类型现在为 F[A]。

(4) 返回类型现在为 F[列表[O]]。

与 Pull 上的 fold 和 toList 一样,Stream 上的等效操作现在返回有效的操作。我们说 fold 和 toList 是 Stream 类型的消除器——将流的代数解释或编译为目标 monad。

我们还能将这些消除器与无效流一起使用吗?是的,但这有点尴尬。我们需要显式传递一个适当的 monad 实例:

scala> val x = Stream(1, 2, 3).
         repeat.                  ①
         take(5).
         toList(using Monad.tailrecMonad).
         result
val x: List[Int] = List(1, 2, 3, 1, 2)

(1)重复并委派给他们的拉对方。有关完整定义,请参阅章节代码。

在此示例中,我们使用了 TailRec monad,因此整个流计算是堆栈安全的。我们需要显式传递 monad 实例(或以其他方式将我们的流类型归因于 Stream[TailRec, Int]),并且我们需要通过在 toList 的结果上调用 result 来运行蹦床。我们可以为每个消除器提供无效版本,以避免此样板:

extension [O](self: Stream[Nothing, O])
  def fold[A](init: A)(f: (A, O) => A): A =
    self.fold(init)(f)(using Monad.tailrecMonad).result(1)
 
  def toList: List[O] =
    self.toList(using Monad.tailrecMonad).result
 
scala> val s = Stream(1, 2, 3).repeat.take(5)
val s: Stream[Nothing, Int] = ...
 
scala> val x = s.toList                      ①
val x: List[Int] = List(1, 2, 3, 1, 2)
 
scala> val y = (s: Stream[IO, Int]).toList   ②
val y: IO[List[Int]] = ...

(1) 当 F = Nothing 时调用 toList[O]。

(2) 当 F = IO 时调用 toList 导致 IO[O]。

让我们向 Stream 添加一些与效果一起使用的新构造函数和组合器。


练习 15.9

——————————————————————————————

在 Stream 伴随对象上实现 eval 构造函数:

object Stream:
  def eval[F[_], O](fo: F[O]): Stream[F, O]


练习 15.10

——————————————————————————————

在流类型上实现 mapEval 扩展方法:

extension [F[_], O](self: Stream[F, O])
  def mapEval[O2](f: O => F[O2]): Stream[F, O2]


练习 15.11

——————————————————————————————

在 Stream 和 Pull 伴随对象上实现 unfoldEval:

object Stream:
  def unfoldEval[F[_], O, R](
    init: R)(f: R => F[Option[(O, R)]]): Stream[F, O]
 
object Pull:
  def unfoldEval[F[_], O, R](
    init: R)(f: R => F[Either[R, (O, R)]]): Pull[F, O, R]

最后,让我们更新 Pipe 以考虑效果:

type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O]

现在,我们有了管道类型的设计选择。我们选择在 Pipe 的定义中添加单个类型参数 F[_],该参数指定输入流和输出流的效果类型。通过在输入和输出中都使用此类型参数,我们被迫一律地定义参数(尝试协变定义它,看看编译器抱怨什么)。然而,在这样做时,我们放弃了一点一般性。相反,我们可以像这样定义 Pipe:

type Pipe[-F[_], -I, +G[_], +O] = Stream[F, I] => Stream[G, O]

虽然这个定义更具表现力,但它也更复杂。由于 Pipe 只是一个类型别名,因此在需要额外表现力的情况下,我们始终可以使用显式函数类型。因此,我们将坚持使用更简单的定义。

15.3.1 有效的流计算

现在,拉取、流和管道支持对任意效应的评估,我们可以构建计算,以增量方式从源获取数据,对其进行转换,并将结果发送到接收器。如果源由流表示,转换由管道表示,我们如何表示接收器?

事实证明,我们对水槽不需要任何特殊支持!事实上,练习 15.10 中的 mapEval 方法为有效汇提供了基础。我们采用的约定是,接收器是将输出更改为“单位”或“无”的管道,实质上是使用源流中的每个输出值。请考虑以下示例接收器:

def log[O]: Pipe[IO, O, Unit] = _.mapEval(o => IO(println(o)))
 
def drain[F[_], O]: Pipe[F, O, Nothing] = _.flatMap(_ => Stream.empty)

日志接收器使用 mapEval 将源的每个输出值打印到控制台。漏极灌电流丢弃源的所有输出元件。将所有部分放在一起,我们可以描述一个有效的流计算:

case class Message(id: UUID, timestamp: Instant, ...)
 
def format(m: Message): String = s"id: $id, timestamp: $timestamp, ..."
 
val dequeue: IO[Message] = ...
 
val logAll: Stream[IO, Unit] =
  Stream.eval(dequeue).repeat.map(format).pipe(log)

在此示例中,取消排队操作将一直阻塞,直到消息可用(可能来自分布式消息队列)。logAll 操作重复评估取消排队,对于收到的每条消息,将消息格式化为字符串并将结果记录到控制台。整个流计算具有流 [IO, Unit] 类型。我们如何运行此流?我们唯一的消除器是 折叠和列表 .调用toList将是一个问题,因为每个日志操作都会输出一个单位值,该单位值将在列表生成器中累积,最终使用我们所有的堆。相反,让我们折叠并丢弃每个输出值。这在使用有效的流时经常出现,因此我们可以创建一个新的消除器:

extension [F[_], O](self: Stream[F, O])
  def run(using Monad[F]): F[Unit] =
    fold(())((_, _) => ()).map(_(1))
 
val program: IO[Unit] = logAll.run

15.3.2 处理错误

某些效应类型提供处理评估期间发生的错误的功能。第 13 章的章节代码将 Task[A] 类型定义为围绕 IO[Try[A]] 的不透明类型。任务允许我们忽略潜在错误的存在,直到我们想要处理它们:

val a: Task[Int] = Task("asdf".toInt)
val b: Task[Int] = a.map(_ + 1)
val c: Task[Try[Int]] = b.attempt
val d: Try[Int] = c.unsafeRunSync(pool)

评估分配给 的任务时,从字符串到整数的转换失败,并显示 NumberFormatException 。任务处理该异常并将其内部包装存储在失败中(回想一下,失败是 Try 的数据构造函数)。在 b 的定义中,映射调用最终是无操作。在 c 的定义中,对尝试的调用暴露了底层的 Try[Int]。

Task 将错误通道添加到 IO — 用于传递通过计算发生的错误的管道,其方式与 Try 和 Both(执行)提供无效计算的方式大致相同。我们可以定义一个 monad[Task] 实例,它的工作方式就像 Try 和 要么 的 monad 实例一样,在遇到故障时短路。Task 还提供了 handleErrorWith 组合器,派生自 try 和 flatMap:

def handleErrorWith(h: Throwable => Task[A]): Task[A] =
  attempt.flatMap:
    case Failure(t) => h(t)
    case Success(a) => Task(a)
 
val a: Task[Int] = Task("asdf".toInt)
val b: Task[Int] = a.map(_ + 1)
val c: Task[Int] = b.handleErrorWith(_ => Task(0))

handleErrorWith 有点类似于 flatMap,因为它提供排序,但提供错误通道而不是结果通道。当我们使用任务与流时会发生什么?我们可以像使用 IO 一样使用 eval:

val s: Stream[Task, Int] = Stream.eval(Task("asdf".toInt))

当我们通过其中一个流消除器(例如,toList 或 run )将此流转换为任务时,我们最终会以包装 NumberFormatException 的失败告终。能够将此类错误作为流和拉取 API 的一部分来处理非常有用,因此让我们为每个错误添加 handleErrorWith:

enum Pull[+F[_], +O, +R]:
  case Handle(
    source: Pull[F, O, R],
    handler: Throwable => Pull[F, O, R]
  ) extends Pull[F, O, R]
 
  def handleErrorWith[F2[x] >: F[x], O2 >: O, R2 >: R](
    handler: Throwable => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
    Pull.Handle(this, handler)
 
extension [F[_], O](self: Stream[F, O])
  def handleErrorWith(handler: Throwable => Stream[F, O]): Stream[F, O] =
    Pull.Handle(self, handler)

我们向 Pull 添加了一个新的构造函数,用于捕获原始拉取和错误处理程序。然后,我们需要扩充 step 的定义来解释这个 Handle 构造函数。当单步执行并遇到句柄时,我们需要单步执行内部拉动并处理执行此操作时发生的任何错误。但是 step 被定义为适用于具有 monad 实例的任意效果 F[_],并且 monad 不提供处理错误的能力。为了解决这个问题,让我们引入一个新的类型类,它为 Monad 添加了错误处理功能:

trait MonadThrow[F[_]] extends Monad[F]:
  extension [A](fa: F[A])
    def attempt: F[Try[A]]
    def handleErrorWith(h: Throwable => F[A]): F[A] =
      attempt.flatMap:
        case Failure(t) => h(t)
        case Success(a) => unit(a)
 
  def raiseError[A](t: Throwable): F[A]

MonadThrow 扩展了 Monad,并添加了引发错误并在以后处理它们的能力。


练习 15.12

——————————————————————————————

定义 MonadThrow 的法律 .考虑尝试应如何与 raiseError 和 unit 交互。


现在,我们可以更新步骤的定义,以采用 MonadThrow 实例:

def step[F2[x] >: F[x], O2 >: O, R2 >: R](
  using F: MonadThrow[F2]
): F2[Either[R2, (O2, Pull[F2, O2, R2])]] =
  this match
    ...
    case Handle(source, f) =>
      source match
        case Handle(s2, g) =>
          s2.handleErrorWith(x =>
            g(x).handleErrorWith(y => f(y))).step     ①
        case other =>
          other.step
            .map:
              case Right((hd, tl)) => Right((hd, Handle(tl, f)))
              case Left(r) => Left(r)
            .handleErrorWith(t => f(t).step)          ②

(1) 将左嵌套处理程序重写为右嵌套处理程序。

(2) 处理步进源拉取时出现的错误。

此实现首先检查左嵌套的错误处理程序,并将此类嵌套重写为右嵌套,就像我们对 flatMap 的左嵌套调用所做的那样。否则,它会逐步执行原始拉取,并通过将目标效果 F 委托给 handleErrorWith 方法来处理发生的任何错误6。如果没有发生错误,则我们将错误处理程序传播到余数拉取。

由于步骤采取MonadThrow,各种消除器也必须采取MonadThrow:fold,toList和run。我们同样可以通过为每个引入 raiseError 构造函数来添加对在拉取和流中显式引发错误的支持。有关完整详细信息,请参阅章节代码。

现在我们已经有了 handleErrorWith 和 raiseError for stream,我们可以定义 onComplete :

extension [F[_], O](self: Stream[F, O])
  def onComplete(that: => Stream[F, O]): Stream[F, O] =
    self.handleErrorWith(t => that ++ raiseError(t)) ++ that

onComplete 方法允许我们在源流完成后评估流,无论源流是成功完成还是失败并出现错误。当传递给 onComplete 的流评估效果时,此功能尤其强大。

onComplete 是否允许我们安全地获取和释放资源,例如文件句柄?一起来看看:

def acquire(path: String): Task[Source] =
  Task(Source.fromFile(path))
 
def use(source: Source): Stream[Task, Unit] =
  Stream.eval(Task(source.getLines))
    .flatMap(itr => Stream.fromIterator(itr))
    .mapEval(line => Task(println(line)))
 
def release(source: Source): Task[Unit] =
  Task(source.close())
 
val printLines: Stream[Task, Unit] =
  Stream.eval(acquire("path/to/file")).flatMap(resource =>
    use(resource).onComplete(Stream.eval(release(resource))))

我们首先评估获取任务以打开文件,然后对结果进行平面映射,构建一个使用打开的文件的流,并通过 onComplete .然后,我们将文件的每一行打印到控制台。如果我们运行它,也许在资源获取和完成时添加了一些日志记录,我们将看到它完全符合我们的需求。

不幸的是,这种方法有一个重大局限性;它是不可组合的。考虑一下如果我们稍微重构一下,提取每一行的打印,直到调用onComplete之后会发生什么:

val printLines: Stream[Task, Unit] =
  Stream.eval(acquire("path/to/file"))
    .flatMap: resource =>
      Stream.eval(source => Task(source.getLines))
        .flatMap(itr => Stream.fromIterator(itr))
        .onComplete(Stream.eval(release(resource)))
    .mapEval(line => Task(println(line)))

该程序的行为应该与原始程序相同,实际上,当没有错误时,它就会这样做。但是,如果最终的 mapEval 失败并出现异常,则不会调用 onComplete 注册的错误处理程序,整体计算将失败,并且资源永远不会最终确定。

另一个有问题的情况是提前终止,因为只采用了源流的某些元素。想象一下,如果我们改变使用方式来获取文件的前 10 行。如果我们把 take(10) 放在 使用主体 中,那么整个流的行为是正确的。但是,如果我们将 take(10) 移动到调用 onComplete 之后,则返回前 10 行,并且资源永远不会最终确定。

在这两种情况下,无论流中发生什么其他情况,我们都希望资源最终确定。仅仅依靠 flatMap 和 onComplete 并不能给我们足够的能力来提供这种保证——我们很快就会回到这个保证。

向 Stream 添加对错误处理的支持会导致对其消除表单的更强约束。因此,不再可能将 Stream[IO, X] 转换为 IO[List[X]] 或任何其他 IO 值,因为 IO 没有 MonadThrow 实例。这是正确的权衡吗?实际上,大多数 monadic 效果类型都支持错误处理,因此此约束并不像最初看起来那样具有限制性,但值得尝试其他 API 选择。

无效流呢?我们之前在 TailRec monad 中评估了它们,它也没有 MonadThrow 实例。这更容易解决 - 我们可以向 TailRec 添加错误处理,就像我们将其添加到 IO 一样,通过在里面嵌入一个 Try:

opaque type SyncTask[A] = TailRec[Try[A]]

我们可以定义一个 MoandThrow[SyncTask] 实例并修改无效消除器以使用 SyncTask 而不是 TailRec 来评估流。通常,每次向流和拉取添加其他功能时,都必须注意该功能如何更改消除器的要求。

15.3.3 确保资源安全

Stream 可用于与任意资源通信,不仅是文件,还包括数据库连接、网络套接字等。在上一节中,我们尝试打开一个文件并确保无论流计算如何终止都将其关闭,我们发现这样做比添加对错误处理的支持更困难。让我们重新审视我们需要支持的各种类型的终止。

假设我们有行:Stream[任务,字符串]表示大文件的行。这是一个或生产者,它引用了我们希望确保关闭的资源(文件句柄),无论如何使用此生产者

我们何时应该关闭此文件句柄 — 在程序的最后?不。理想情况下,一旦我们知道我们完成了从行中读取,我们就会关闭文件。如果我们到达文件的最后一行,我们当然就完成了;此时,没有更多要生成的值,可以安全地关闭文件。因此,这给了我们第一个要遵循的规则:生产者应该在知道它没有进一步的值时立即释放任何底层资源,无论是由于正常耗尽还是异常

但是,这还不够,因为流的使用者本身可能决定提前终止消费。考虑 lines.take(5).toList 。对 take(5) 的调用将在仅收到五行后停止,可能是在文件耗尽之前。在这种情况下,我们希望确保在整个流完成之前运行任何必要的资源清理。请注意,toList 不能对此负责,因为 toList 不知道它所解释的流在内部由其他流组成,其中一个流需要完成。因此,我们要遵循第二条规则:任何使用来自另一个流 t 的值 s 必须确保在 s 停止之前运行 t 的清理操作

总而言之,流 s 可能会由于以下原因而终止

  • 当源没有其他要输出的值时,生产者耗尽
  • 当源的其余部分被丢弃时提前终止
  • 引发异常时异常终止

无论原因如何,我们都希望在每种情况下关闭基础资源。

现在我们有了指导方针,我们如何实际实施呢?为了处理提前终止,我们需要一种方法来运行源流的清理操作,即使它被丢弃。这建议了一种设计,其中获取资源的清理操作由解释器跟踪,而不是由flatMap和handleErrorWith组成的流中的原始操作。让我们介绍一个新的流构造函数来捕获此模式:

def resource[F[_], R](acquire: F[R])(release: R => F[Unit]): Stream[F, R]

资源操作从 get 创建单个元素流,并保证根据我们建立的准则评估发布操作。我们如何实现资源?在本节的其余部分,我们将介绍一种方法,但不要担心实现的所有细节。这里的重要概念是资源的行为方式,而不是资源的实现方式。

首先,我们将介绍发生流解释的范围的概念。作用域可以是打开的,也可以是封闭的。开放范围具有关联的清理操作,我们将其称为终结器。开放范围也可以具有任意数量的子范围。我们可以通过打开引用该终结器的子作用域来在作用域上注册一个新的终结器。可以关闭开放范围,其效果是先关闭所有子范围,然后运行终结器。

由于我们将终结器表示为 F[Unit] 类型的值,因此 Scope 类型必须由我们的效果类型参数化。为了实现各种状态转换,例如打开子范围和关闭范围树,我们需要能够在效果 F 中操作状态。我们可以使用一个新类型来做到这一点— 参考 :

final class Ref[F[_], A] private (
  underlying: AtomicReference[A],
  delay: [A] => (() => A) => F[A]       ①
):
 
  def get: F[A] = delay(() => underlying.get)
 
  def set(a: A): F[Unit] = delay(() => underlying.set(a))
 
  def modify[B](f: A => (A, B)): F[B] = delay: () =>
    @annotation.tailrec
    def loop(): B =
      val oldA = underlying.get
      val (newA, result) = f(oldA)
      if underlying.compareAndSet(oldA, newA) then result else loop()
    loop()
 
object Ref:
  def apply[F[_], A](initial: A)(using F: Monad[F]): Ref[F, A] =
    new Ref(new AtomicReference[A](initial),
      [A] => (th: () => A) => F.unit(()).map(_ => th()))

(1)将任意的投掷提升为有效的动作

Ref[F, A] 是 A 型变量的纯泛函等价物,其中所有访问和突变都在效果 F 中执行。此实现使用 java.util .concurrent。原子引用作为基础存储,同时公开有效的访问器和突变器。

修改操作特别有用。它以原子方式从当前值计算一个新值,并允许返回任意附加值。使用 Ref 和 修改 ,我们可以将 Scope 实现为具有打开和关闭状态的状态机:

class Scope[F[_]](
  parent: Option[Scope[F]],
  val id: Id,
  state: Ref[F, Scope.State[F]]
)(using F: MonadThrow[F]):
  import Scope.State
 
  def open(finalizer: F[Unit]): F[Scope[F]] = ???
  def close: F[Unit] = ???
 
object Scope:
  enum State[F[_]]:
    case Open(
      finalizer: F[Unit], subscopes: Vector[Scope[F]]) extends State[F]
    case Closed() extends State[F]
 
  def root[F[_]](using F: MonadThrow[F]): Scope[F[_]] =
    new Scope(None, new Id, Ref(State.Open(F.unit(()), Vector.empty)))

作用域将其可变状态存储在 Ref[F, 状态 [F]] 中。状态 [F] 类型是由打开和关闭案例组成的枚举。Open 存储对此作用域和子作用域的终结器的引用。

可以通过计算注册新子范围的新打开状态来打开子范围:

def open(finalizer: F[Unit]): F[Scope[F]] =
  state
    .modify:
      case State.Open(myFinalizer, subscopes) =>
        val sub = new Scope(Some(this), new Id,
                            Ref(State.Open(finalizer, Vector.empty)))
        State.Open(myFinalizer, subscopes :+ sub) -> F.unit(sub)
      case State.Closed() =>
        val next = parent match
          case None =>
            F.raiseError(new RuntimeException("root scope already closed"))
          case Some(p) => p.open(finalizer)
        State.Closed() -> next
    .flatten

我们有两种情况需要处理:在开放作用域上调用 open 时,以及在封闭作用域上调用 open 时。如果作用域是开放的,那么我们首先为新的终结器构造一个新作用域,然后计算一个新的 Open 状态,该状态在其子作用域中引用新作用域。回想一下,修改的签名允许我们返回任意值以及更新的状态;这里我们返回一个 F[Scope[F]] 作为该任意值。暂时忽略 Closed 的情况,我们有一个 state.modify(...) 返回一个 F[F[Scope[F]],我们将其展平为 F[Scope[F]]。

这种模式可扩展到我们可以想象的任何状态机。我们将当前状态存储在 Ref 中,并根据 ref.modify(f).flatten 执行转换,其中 f 从当前状态计算下一个状态,并在成功修改 Ref 的状态后返回一个要评估的动作。

回到已关闭的情况,我们可以选择将打开封闭作用域上的子作用域设置为错误。相反,我们会将打开的请求传播到父范围。该范围也可能是封闭的,这将导致递归传播到树上,直到我们遇到开放范围或到达根。只有当我们到达根而没有遇到开放范围时,我们才会发出错误信号。关闭操作以类似的方式实现:

def close: F[Unit] =
  state
    .modify:
      case State.Open(finalizer, subscopes) =>
        val finalizers = (subscopes.reverseIterator.map(_.close) ++
          Iterator(finalizer)).toList
        def go(rem: List[F[Unit]], error: Option[Throwable]): F[Unit] =
          rem match
            case Nil => error match
              case None => F.unit(())
              case Some(t) => F.raiseError(t)
            case hd :: tl =>
              hd.attempt.flatMap(res =>
                go(tl, error orElse res.toEither.swap.toOption))
        State.Closed() -> go(finalizers, None)
      case State.Closed() => State.Closed() -> F.unit(())
    .flatten

在开放作用域上调用 close 会将作用域的状态转换为 已关闭 。我们计算一个关闭所有子作用域的操作,然后运行原始作用域的终结器。我们必须小心颠倒子作用域的顺序,以确保终结器以类似堆栈的方式运行(即终结器必须按后进先出顺序运行)。我们还必须小心运行所有终结器,即使发生错误也是如此。我们不能只是遍历操作列表;否则,我们会在第一个错误上短路。

现在让我们将作用域集成到流和拉取中。我们将首先在 Pull 枚举中创建两个新案例:

case OpenScope[+F[_], +O, +R](
  source: Pull[F, O, R], finalizer: Option[F[Unit]]
) extends Pull[F, O, R]
 
case WithScope[+F[_], +O, +R](
  source: Pull[F, O, R], scopeId: Id, returnScope: Id
) extends Pull[F, O, R]

OpenScope 引用了源拉取器和终结器。在遇到OpenScope时,解释器在活动范围上调用open,传递终结器。然后,它使用新打开的作用域将源拉取解释为新的活动作用域。WithScope 引用源拉取、解释该源拉取期间要使用的作用域的 ID,以及源拉取终止时要返回到的范围 ID。

这些新情况暗示,解释器(即折叠和步骤)需要跟踪活动范围并允许步骤更改当前范围。若要允许活动范围更改,我们需要更改步骤的结果以包含新的活动范围。我们可以将结果类型从 Both[R, (O, Pull[F, O, R])] 更改为 (Scope[F], Both[R, (O, Pull[F, O, R])]),但这有点让人不知所措。相反,让我们引入一个表示步骤结果的新类型:

enum StepResult[F[_], +O, +R]:
  case Done(scope: Scope[F], result: R)
  case Out(scope: Scope[F], head: O, tail: Pull[F, O, R])

让我们修改折叠以管理活动范围。我们需要构造一个根作用域并在折叠完成后将其关闭,并且我们需要更改每个 StepResult.Out 上的活动作用域:

def fold[F2[x] >: F[x], R2 >: R, A](init: A)(f: (A, O) => A)(
  using F: MonadThrow[F2]
): F2[(R2, A)] =
  val scope = Scope.root[F2]
  def go(scope: Scope[F2], p: Pull[F2, O, R2], acc: A): F2[(R2, A)] =
    p.step(scope).flatMap:
      case StepResult.Done(_, r) => F.unit((r, init))
      case StepResult.Out(newScope, hd, tl) =>
        go(newScope, tl, f(init, hd))
  go(scope, this, init).attempt.flatMap(res =>
    scope.close.flatMap(_ => res.fold(F.raiseError, F.unit))
  )

步骤操作现在将活动范围作为参数,步骤的结果指示要使用的下一个范围。修改步骤以处理 OpenScope 相当简单:我们委托在活动范围上打开,然后我们使用 WithScope 使用新打开的子范围评估原始源流。终止后,我们将恢复原始范围:

case OpenScope(source, finalizer) =>
  scope.open(finalizer.getOrElse(F.unit(()))).flatMap(subscope =>
    WithScope(source, subscope.id, scope.id).step(subscope))

WithScope的修改步骤涉及更多。我们首先在范围树中搜索目标范围,然后使用该范围单步执行源流。如果这样做会导致 Done ,则我们关闭目标范围并查找要返回的范围。否则,我们通过 Out 中返回的剩余流传播 WithScope:

case WithScope(source, scopeId, returnScopeId) =>
  scope.findScope(scopeId)
    .map(_.map(_ -> true).getOrElse(scope -> false))
    .flatMap:
      case (newScope, closeAfterUse) =>
        source.step(newScope).attempt.flatMap:
          case Success(Out(scope, hd, tl)) =>         ①
            F.unit(Out(scope, hd,
                        WithScope(tl, scopeId, returnScopeId)))
          case Success(Done(outScope, r)) =>          ②
            scope.findScope(returnScopeId)
              .map(_.getOrElse(outScope))
              .flatMap: nextScope =>
                scope.close.as(Done(nextScope, r))
          case Failure(t) =>                          ③
            scope.close.flatMap(_ => F.raiseError(t))

(1)源输出一个元素,所以我们必须通过尾部传播WithScope。

(2) 源终止,因此我们关闭目标范围并将下一个范围重置为返回范围。

(3) 步进时出错,因此关闭目标范围,然后重新引发错误。

通过这些更改,我们可以根据 Eval 和 OpenScope 实现资源:

def resource[F[_], R](acquire: F[R])(release: R => F[Unit]): Stream[F, R] =
  Pull.Eval(acquire).flatMap(r =>
    Pull.OpenScope(Pull.Output(r), Some(release(r))))

此实现处理生产者耗尽和异常终止。不过,我们仍然需要做一件事来处理提前终止:任何时候我们可能丢弃流的其余部分,我们都必须在新的范围内这样做。这样做可确保将所有资源分配给子范围,并且尽早关闭整个范围子树。

让我们向 Stream 添加一个打开新作用域的作用域方法,然后让我们在任何可能部分评估流的操作(如 take、takeWhile 等)上引入此类作用域:

def scope: Stream[F, O] =
  Pull.OpenScope(self, None)
 
def take(n: Int): Stream[F, O] =
  self.take(n).void.scope

通过此更改,take 引入了新的范围。如果 take 部分评估其源,则永远不会达到源的 Done 案例。但是 take 引入的作用域到达了它的尽头,整个子树被关闭,包括在部分评估的源流期间分配的任何资源。

将所有这些部分放在一起,我们可以回到练习 15.8,这次使用我们有效的 Stream 版本:

def file(path: String): Stream[Task, Source] =
  Stream.resource(Task(Source.fromFile(path)))(s => Task(s.close()))
 
def lines(path: String): Stream[Task, String] =
  file(path).flatMap(source =>
    Stream.eval(Task(source.getLines)).flatMap(Stream.fromIterator)
  )
 
def fileWriter(path: String): Stream[Task, BufferedWriter] =
  Stream.resource(
    Task(Files.newBufferedWriter(Paths.get(path))))(w => Task(w.close()))
 
def writeLines(path: String): Pipe[Task, String, Unit] =
  lines => fileWriter(path).flatMap(writer =>
    lines.mapEval(line => Task:
      writer.write(line)
      writer.newLine
    ))
 
val conversion: Pipe[Task, String, String] =
  trimmed andThen
  nonEmpty andThen
  nonComment andThen
  asDouble andThen
  convertToCelsius andThen
  toString
 
def convert(inputFile: String, outputFile: String): Task[Unit] =
  lines(inputFile).pipe(conversion).pipe(writeLines(outputFile)).run

15.3.4 动态资源分配

现实的程序可能需要动态分配资源,同时转换一些输入流。例如,我们可能会遇到如下情况:

  • 动态资源分配 - 读取包含文件名列表的文件(华氏度.txt)。将这些文件连接成单个逻辑流,将此流转换为摄氏度,并将加入的流输出为摄氏度.txt。
  • 多接收器输出 - 这类似于动态资源分配,但不是生成单个输出文件,而是为每个输入文件生成一个以华氏.txt为单位的输出文件。通过将 .celsius 附加到输入文件名来命名输出文件。

这些功能是否可以以保留资源安全的方式合并到我们对 Stream 的定义中?是的,他们可以!我们实际上已经有能力做这些事情,使用我们已经为任意 Stream 类型定义的 flatMap 组合器。

例如,flatMap 加上我们现有的组合器让我们编写第一个场景,如下所示:

def convertAll(inputFile: String, outputFile: String): Task[Unit] =
  lines(inputFile)
    .flatMap(lines).pipe(conversion).pipe(writeLines(outputFile)).run

此代码是完全资源安全的,这意味着所有文件句柄将在完成后由运行器自动关闭,即使存在异常也是如此。

我们可以通过切换对flatMap的调用顺序来写入多个文件:

def convertMultisink(inputFile: String): Task[Unit] =
  lines(inputFile).flatMap(file =>
    lines(file).pipe(conversion).pipe(writeLines(file + ".celsius"))).run

15.4 应用

本章中提出的想法具有广泛的适用性。在流处理方面可以投射出数量惊人的程序 - 一旦你意识到抽象,你就会开始在任何地方看到它。让我们看一下它适用的一些域:

  • 文件 I/O - 我们已经演示了如何将流处理用于文件 I/O。尽管我们在这里的示例中专注于逐行读取和写入,但我们也可以使用该库来处理二进制文件。
  • 消息处理、状态机和执行组件 - 大型系统通常组织为松散耦合组件的系统,这些组件通过消息传递进行通信。这些系统通常以参与者的形式表示,参与者通过显式消息发送和接收进行通信。我们可以将这些架构中的组件表示为流计算,这使我们能够使用高级组合 API 描述极其复杂的状态机和行为。
  • 服务器和 Web 应用程序 - 可以将 Web 应用程序视为将 HTTP 请求流转换为 HTTP 响应流。
  • UI 编程 - 我们可以将单个 UI 事件(例如鼠标单击)视为流,并将 UI 视为一个大型流计算网络,以确定 UI 如何响应用户交互。
  • 大数据和分布式系统 - 流处理库可以分布式并行化,以处理大量数据。这里的关键见解是,流处理网络的节点不需要都位于同一台机器上。

如果您想了解有关这些应用程序(和其他应用程序)的更多信息,请参阅章节注释 (https://github.com/fpinscala/fpinscala/wiki) 以获取更多讨论和进一步阅读的链接。

FS2 库 (https://fs2.io/) 以本章中的思想为基础,提供工业级流式处理库。FS2 提供流、拉取和管道,并支持此处未讨论的各种功能,包括并发、扇入-扇出、中断-恢复以及对现代效果类型的支持,这些功能类型本身比本书中开发的 IO 和 Task 类型强大得多。它还具有一组丰富的 API,用于与文件、网络套接字以及各种其他源和接收器进行交互。各种更高级别的库建立在FS2上,包括流行的Web框架http4s(https://http4s.org)和数据库库,如臭鼬(https://typelevel.org/skunk)和doobie(https://tpolecat.github.io/doobie/)。

15.5 结论

我们以一个简单的前提开始这本书:我们只使用纯函数来组装我们的程序。从这个唯一的前提及其后果出发,我们被引导探索一种全新的编程方法,这种方法既连贯又有原则。在最后一章中,我们构建了一个用于流处理和增量 I/O 的库,证明我们可以保留本书中开发的作曲风格,即使是与外部世界交互的程序。我们关于如何使用FP来构建大型和小型程序的故事现在已经完成。

FP是一个深刻的话题,我们只是触及了表面。到目前为止,您应该拥有继续自己旅程所需的一切,使函数式编程成为您自己工作的一部分。虽然好的设计总是很困难,但随着时间的推移,在功能上表达你的代码将变得毫不费力。当您将FP应用于更多问题时,您会发现新的模式和更强大的抽象。享受旅程,继续学习,祝你好运!

总结

  • 命令式 I/O 是整体式的,将高级算法与与迭代和资源安全相关的低级细节混合在一起。
  • 流式计算具有许多重要属性:它们是增量的、资源安全的和组合的。它们还可能支持提前终止(即部分评估)、错误处理和评估效果。
  • 流和拉取允许在前面章节中构建的效果类型之上构建可组合、资源安全的流式处理计算。
  • take(5) 和 takeWhile(_ > 10) 是可能提前终止流计算的操作示例。
  • Pull 类型提供了用于构造流计算的表达式构造函数和组合器,大量使用一元递归。
  • Pull[F, O, R] 描述了一种流计算,它可以评估 F[_] 类型的效果,输出零个或多个 O 类型的值,并以 R 类型的最终结果完成。
  • 流类型在拉取时提供不同的 API,用于将焦点从构成子计算转移到计算的输出元素。流补充拉动而不是取代它。
  • Stream[F, O] 描述了一种流计算,它可以评估 F[_] 类型的效果并输出 O 类型的零个或多个值。
  • 流[F, O]可以转换为拉[F,O,单位],反之亦然。
  • “流”和“拉取”都具有行为不同的 monad 实例。Stream 实例提供类似列表的映射然后连接行为,而拉取实例提供流式计算的排序。
  • Pipe[F, I, O] 是 Stream[F, I] => Stream[F, O] 的别名。
  • 管道组合只是函数组合(例如,p1 和 p2)。
  • 接收器是输出“单位”或“无”值的流的管道。
  • 若要确保在创建者耗尽时必须最终确定资源安全资源,流评估提前终止,或者流评估由于未处理的错误而终止。
  • 流上的资源操作从采集效果 F[R] 和终结器 R => F[Unit] 构造一个流 [F, R]。
  • Ref[F, A] 类型对存储值 A 的可变单元进行建模,并通过效果 F 提供访问器或突变器。
  • 流计算在各种领域中具有广泛的适用性,包括文件和网络 I/O、消息传递系统、HTTP 服务器、用户界面和分布式数据处理。

15.6 练习答案


答案 15.1

——————————————————————————————

对于 fromListViaUnfold 和 fromLazyListViaUnfold ,我们从初始输入开始展开,在每次迭代中,我们在当前状态上进行模式匹配,如果输入为空,则返回左,如果输入为空,则返回右包装头部和反面。我们必须映射结果以丢弃最终展开状态:

def fromListViaUnfold[O](os: List[O]): Pull[O, Unit] =
  unfold(os):
    case Nil => Left(Nil)
    case hd :: tl => Right((hd, tl))
  .map(_ => ())
 
def fromLazyListViaUnfold[O](os: LazyList[O]): Pull[O, Unit] =
  unfold(os):
    case LazyList() => Left(LazyList())
    case hd #:: tl => Right((hd, tl))
  .map(_ => ())


答案 15.2

——————————————————————————————

我们输出初始值,然后递归调用迭代,f(initial) 的结果作为新的初始值:

def iterate[O](initial: O)(f: O => O): Pull[O, Nothing] =
  Output(initial) >> iterate(f(initial))(f)


答案 15.3

——————————————————————————————

我们使用递归函数实现 drop,该函数取消对单个元素的约束并递归,直到剩余的要丢弃的数字达到零:

def drop(n: Int): Pull[O, R] =
  if n <= 0 then this
  else uncons.flatMap:
    case Left(r) => Result(r)
    case Right((_, tl)) => tl.drop(n - 1)

takeWhile 的相似之处在于,我们首先取消一个元素,然后决定是否递归。如果 unconsed 元素传递谓词,那么我们输出它,然后在尾部调用 takeWhile。如果未协调的元素使谓词失败,则我们返回作为拉取结果附加到尾部的元素。相反,如果我们耗尽源代码,则返回结果:

def takeWhile(f: O => Boolean): Pull[O, Pull[O, R]] =
  uncons.flatMap:
    case Left(r) => Result(Result(r))
    case Right((hd, tl)) =>
      if f(hd) then Output(hd) >> tl.takeWhile(f)
      else Result(Output(hd) >> tl)

dropWhile 与 takeWhile 非常相似,只是我们不输出传递谓词的元素:

def dropWhile(f: O => Boolean): Pull[Nothing, Pull[O, R]] =
  uncons.flatMap:
    case Left(r) => Result(Result(r))
    case Right((hd, tl)) =>
      if f(hd) then tl.dropWhile(f)
      else Result(Output(hd) >> tl)


答案 15.4

——————————————————————————————

我们使用一个递归函数,它将累积的总数作为参数,以及下一个拉动。我们从拉动中解开下一个元素,并将其与到目前为止的总数相结合。我们输出新的总数和递归,传递新的总数和uncons的尾部。相反,如果我们耗尽源拉取,则返回源拉取的结果:

def tally[O2 >: O](using m: Monoid[O2]): Pull[O2, R] =
  def go(total: O2, p: Pull[O, R]): Pull[O2, R] =
    p.uncons.flatMap:
      case Left(r) => Result(r)
      case Right((hd, tl)) =>
        val newTotal = m.combine(total, hd)
        Output(newTotal) >> go(newTotal, tl)
  Output(m.empty) >> go(m.empty, this)


答案 15.5

——————————————————————————————

与上一个练习中的 Tally 一样,我们使用一个递归函数,它将当前状态和下一个拉取作为参数,将状态表示为不可变的整数队列。该函数取消元素并将其添加到队列中,确保队列不会超过指定的大小。然后我们输出队列的平均值并在尾部递归:

extension [R](self: Pull[Int, R])
  def slidingMean(n: Int): Pull[Double, R] =
    def go(
      window: collection.immutable.Queue[Int],
      p: Pull[Int, R]
    ): Pull[Double, R] =
      p.uncons.flatMap:
        case Left(r) => Result(r)
        case Right((hd, tl)) =>
          val newWindow = if window.size < n then window :+ hd
                                             else window.tail :+ hd
          val meanOfNewWindow = newWindow.sum / newWindow.size.toDouble
          Output(meanOfNewWindow) >> go(newWindow, tl)
    go(collection.immutable.Queue.empty, self)


答案 15.6

——————————————————————————————

在每个函数中,传递给递归驱动程序函数的累加器类型成为 mapAccumulate 的状态类型。我们必须通过映射 mapAccumulate 的结果来丢弃最终状态:

def countViaMapAccumulate: Pull[Int, R] =
  Output(0) >> mapAccumulate(0)((s, o) => (s + 1, s + 1)).map(_(1))
 
def tallyViaMapAccumulate[O2 >: O](using m: Monoid[O2]): Pull[O2, R] =
  Output(m.empty) >>
    mapAccumulate(m.empty): (s, o) =>
      val s2 = m.combine(s, o)
      (s2, s2)
    .map(_(1))
 
extension [R](self: Pull[Int, R])
  def slidingMeanViaMapAccumulate(n: Int): Pull[Double, R] =
    self
      .mapAccumulate(Queue.empty[Int]): (window, o) =>
        val newWindow = if window.size < n then window :+ o
                                          else window.tail :+ o
        val meanOfNewWindow = newWindow.sum / newWindow.size.toDouble
        (newWindow, meanOfNewWindow)
      .map(_(1))


答案 15.7

——————————————————————————————

一种解决方案是将提供的函数映射到每个元素上,并使用逻辑 OR 幺半群组合所有生成的布尔值。此实现遍历整个源代码,而不是在遇到传递谓词的元素时立即停止:

def exists[I](f: I => Boolean): Pipe[I, Boolean] =
  src => src.map(f).toPull.tally(using Monoid.booleanOr).toStream

我们根据非停止版本定义停止版本,将其与 takeThrough 和 dropWhile 结合使用:

def takeThrough[I](f: I => Boolean): Pipe[I, I] =
  src => src.toPull.takeWhile(f)
           .flatMap(remainder => remainder.take(1)).void.toStream
 
def dropWhile[I](f: I => Boolean): Pipe[I, I] =
  src => src.toPull.dropWhile(f)
           .flatMap(remainder => remainder).void.toStream
 
def existsHalting[I](f: I => Boolean): Pipe[I, Boolean] =
  exists(f) andThen takeThrough(!_) andThen dropWhile(!_)

或者,我们可以将 dropWhile(!_) 替换为 last —一个输出拉取的最后一个值的函数:

def last[I](init: I): Pipe[I, I] =
  def go(value: I, p: Pull[I, Unit]): Pull[I, Unit] =
    p.uncons.flatMap:
      case Left(_) => Pull.Output(value)
      case Right((hd, tl)) => go(hd, tl)
  src => go(init, src.toPull).toStream
 
def existsHalting[I](f: I => Boolean): Pipe[I, Boolean] =
  exists(f) andThen takeThrough(!_) andThen last(false)


答案 15.8

——————————————————————————————

让我们首先定义一个 Pipe[String, Double],它接受一行输入并将其转换为摄氏度。我们可以将其定义为单责任管道的管道,然后将它们全部组合到单个管道中:

def trimmed: Pipe[String, String] =
  src => src.map(_.trim)
 
def nonComment: Pipe[String, String] =
  src => src.filter(_.charAt(0) != '#')
 
def asDouble: Pipe[String, Double] =
  src => src.flatMap: s =>
    s.toDoubleOption match
      case Some(d) => Stream(d)
      case None => Stream()
 
def convertToCelsius: Pipe[Double, Double] =
  src => src.map(toCelsius)
 
val conversion: Pipe[String, Double] =
  trimmed andThen
  nonEmpty andThen
  nonComment andThen
  asDouble andThen
  convertToCelsius

然后,我们使用此管道转换输入文件的行,并将每个转换后的输出写入输出文件。我们需要在 IO 类型中编写一个驱动程序函数;我们打开输入和输出文件并在输入行上使用转换,将每个转换后的行写入输出文件:

import java.nio.file.{Files, Paths}
 
def convert(inputFile: String, outputFile: String): IO[Unit] =
  IO:
    val source = scala.io.Source.fromFile(inputFile)
    try
      val writer = Files.newBufferedWriter(Paths.get(outputFile))
      try
        fromIterator(source.getLines)
          .pipe(conversion)
          .fold(()): (_, a) =>
            writer.write(a.toString)
            writer.newLine()
      finally writer.close()
    finally source.close()


答案 15.9

——————————————————————————————

我们使用 Pull.Eval 创建一个 Pull[F, Nothing, O],然后用 Pull.Output 在其上平面映射,从而得到一个 Pull[F, O, Unit],相当于一个 Stream[F, O]:

def eval[F[_], O](fo: F[O]): Stream[F, O] =
  Pull.Eval(fo).flatMap(Pull.Output(_))


答案 15.10

——————————————————————————————

我们可以将平面地图与 eval 一起使用。由于我们在 Stream 伴侣中定义它,并且由于 Stream 是 Pull 上的不透明类型,我们必须注意显式引用 Stream.flatMap 而不是 Pull 上的 flatMap 方法:

extension [F[_], O](self: Stream[F, O])
  def mapEval[O2](f: O => F[O2]): Stream[F, O2] =
    Stream.flatMap(self)(o => Stream.eval(f(o)))


答案 15.11

——————————————————————————————

实现几乎相同 — 我们 Eval(f(init)) 和 flatMap 结果,如果结果发出终止信号,则终止或输出元素并使用新状态递归:

object Pull:
  def unfoldEval[F[_], O, R](init: R)(f: R =>
    F[Either[R, (O, R)]]): Pull[F, O, R] =
     Pull.Eval(f(init)).flatMap:
      case Left(r) => Result(r)
      case Right((o, r2)) => Output(o) >> unfoldEval(r2)(f)
 
object Stream:
  def unfoldEval[F[_], O, R](init: R)(f: R =>
    F[Option[(O, R)]]): Stream[F, O] =
     Pull.Eval(f(init)).flatMap:
      case None => Stream.empty
      case Some((o, r)) => Pull.Output(o) ++ unfoldEval(r)(f)


答案 15.12

——————————————————————————————

让我们首先考虑尝试如何与可访问的构造函数(即 unit 和 raiseError )交互:

unit(a).attempt == unit(Right(a))
raiseError(t).attempt == unit(Left(t))

让我们也看看handleErrorWith如何与每个构造函数交互:

unit(a).handleErrorWith(h) == unit(a)
raiseError(t).handleErrorWith(h) == h(t)
 

以下是 raiseError 如何与 flatMap 交互:

raiseError(t).flatMap(f) == raiseError(t)

我们还可能对如何处理异常提出一些更严格的要求。例如,单位是否应该捕获从其 thunk 参数中抛出的异常?

unit(throw new Exception) == raiseError(new Exception)

如果从传递给 map 和 flatMap 的函数中抛出,是否应该捕获异常?

fa.map(_ => throw new Exception) == raiseError(new Exception)
fa.flatMap(_ => throw new Exception) == raiseError(new Exception)

没有错误的答案,只有权衡。


1 当垃圾回收时,JVM实际上会关闭一个输入流(这是支持scala.io.Source的原因),但是没有办法保证这种情况会及时发生,或者根本不会发生!在不经常执行完整收集的分代垃圾回收器中尤其如此。

2 如果 indexOfSlice 的参数不作为输入的子序列存在,则返回 -1 。有关详细信息,请参阅 API 文档,或在 REPL 中试验此函数。

3 为简单起见,我们选择在本章省略一些蹦床。

4 我们可以将 F 定义为 +F[+_],因为我们总是使用作为函子的效应类型(函子本质上是协变的)。这样做可以简化各种方法的定义,但需要我们使用的效应类型在其自己的类型参数中协变。实际上,像 IO 这样的效应类型很少以协变方式定义其类型参数,因此这里我们选择了 +F[_]。

5 章节代码通常定义对 Stream 而不是 Pull 的操作,除非该操作在构建递归拉取时很有用。

6 此实现不会捕获从处理程序本身抛出的异常(即,当 f(t) 抛出时)。

Tags:

最近发表
标签列表