Streams

Note: note that the functionality described in this section will be likely redesigned in the future.

Most of demos related to asynchronous programming show, how to work with streams of uniform data. AsyncScala allows working with non-uniform streams of events, but uniform events streams are easy to implement as well. They are provided as part of library as AStream and AThunk. They are quite natural to implement within library.

ACloseable

The both types of the streams might hold some resources while they are active, and need to free resources or perform some unregistration when they are done with. So they extend ACloseable interface. The interface just contains a single close method:

trait ACloseable extends Asynchronous {
  def close: Promise[Unit]
}

To support work with this interface there is aUsing operator, that was originally inspired by C# using keyword. The different implementations of this operator are in ResourceUtil object.

  def aUsing[R <: ACloseable, T](open: => Promise[R])(body: R => Promise[T]): Promise[T] = {
    var r: Option[R] = None
    aWhen(open) {o =>
      require(o != null, "Resource reference must not be null")
      r = Some(o)
      body(o)
    }.finallyDo {
      r match {
        case Some(r) => r.close
        case None => aUnit
      }
    }
  }  

AStream

AStream interface is simply iterator over some data, with None indicating end of stream.

trait AStream[T] extends ACloseable {
  def next: Promise[Option[T]]
}

AThunk

AThunk is even simpler. Note that put method returns a promise for Unit. This promise is supposed to be resolved when thunk is ready to accept more data. This can be used to implement flow control.

trait AThunk[-T] extends ACloseable {
  def put(t: T): Promise[Unit]
}

With these pieces in place, it is possible to implement a lot of different scenarios.

Trivial Streams

The trivial streams are provided in Streams objects. The object allows to create streams over different collections. The sample below creates stream over range of integers.

  @Test
  def testTen() {
    doAsync {
      expectEquals((1 to 10).toList) {
        val l = new ListBuffer[Int]
        aSeq {
          aUsing(Streams.iterate(1 to 10)) {s =>
            aSeqLoop {
              aWhenLast(s.next) {
                case None => false
                case Some(t) => l += t; true
              }
            }
          }
        }.thenLastI {
          l.toList
        }
      }
    }
  }

It allows to create own trivial as well without implementing stream interface. For example, the following sample creates an infinite stream of random numbers and prints first 10 values out of it.

object RandomStream {
  def main(args: Array[String]) {
    doAsync {
      val r = new Random()
      aUsing(Streams.immediate(Some(r.nextInt()))) {s =>
        aSeqForUnit(1 to 10) {_ =>
          aWhenLast(s.next) {
            case None => aFailure[Unit](new RuntimeException("Unexpected!"))
            case Some(t) => println(t)
          }
        }
      }
    }
  }
}

RichStream

There is class (to which there is an implicit conversion from AStream) RichStream that allows performing different operations on the stream. This includes mapping, filtering, trottling, and so on. The sample above could be rewritten as the following (we also filter out negative numbers, and reduce odd ones by one to demonstrate rich stream functions).

Note: the streams that are created by RichStream wrapper are not exported. So you need to call export method on the final stream to make stream safe to use from other vats. This is because the streams that are created by rich stream object are usually immediately consumed in the same vat.

object RandomRichStream {
  def main(args: Array[String]) {
    doAsync {
      val r = new Random()
      Streams.immediate(Some(r.nextInt())).filter(_ > 0).map {i =>
        if (i % 2 != 0) i - 1 else i
      }.head(10).consumeAndClose {i =>
        println(i)
        true
      }
    }
  }
}

ATimer

There is a different number possible data for stream. One of them is ATimer. Periodic events could be considered as a stream of time events. And stream close could be considered as cancellation operation.

object TimerStream {
  def main(args: Array[String]) {
    doAsync {
      aUsing(Timer.make()) {timer =>
        val start = System.currentTimeMillis()
        aUsing(timer.fixedRate(new Date(start +500), 500)) {s =>
          s.head(10).consume {i =>
            println(i - start)
            true
          }
        }
      }
    }
  }
}

Note that the timer itself uses additional thread, so it is a closeable resource as well.

GUI Streams

GUI events are also kinds of streams. They are not yet supported by library yet, but you could create own streams. For example text changes stream for JTextComponent could be created as the following:

  def textChanged(c: JTextComponent): AStream[String] = {
    require(EventQueue.isDispatchThread)
    var l: DocumentListener = null
    val (stream, thunk) = QueueStream.make[String, Unit] {() =>
      c.getDocument.removeDocumentListener(l)
    }
    l = new DocumentListener {
      def changedUpdate(e: DocumentEvent) {
        thunk.put(c.getText)
      }

      def removeUpdate(e: DocumentEvent) {
        changedUpdate(e)
      }

      def insertUpdate(e: DocumentEvent) {
        changedUpdate(e)
      }
    }
    c.getDocument.addDocumentListener(l)
    stream
}

The stream uses special QueueStream that uses Queue internally and creates a pair from AThunk and AStream. When stream closed a callback specified during stream construction is invoked. And this callback removes listener. Now lets use the stream.

object SwingStream extends JFrame("Sample") {
  def main(args: Array[String]) {
    aSend(AWTVat.get) {
      setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE)
      val c = getContentPane
      c.setLayout(new GridLayout(3, 1))
      val f = new JTextField(20)
      val l = new JLabel("")
      val tm = new JLabel("")
      c.add(f)
      c.add(l)
      c.add(tm)
      pack()
      setVisible(true)
      aSeq {
        aUsing(Timer.make()) {timer =>
          var stop = false
          var count = 0
          aAll {
            EventStreams.textChanged(f).trottle(timer, 1000L).changed.consumeAndClose {t =>
              println("Received: " + t)
              count += 1
              l.setText("" + count + ": " + t.toUpperCase)
              if (t.equals("STOP")) {
                stop = true
              }
              aSuccess(!stop)
            }
          } andLast {
            val start = System.currentTimeMillis()
            aUsing(timer.fixedRate(new Date(start + 500), 500)) {t =>
              t.consume {x =>
                tm.setText("Time: " + (x - start) + "ms")
                aSuccess(!stop)
              }
            }
          }
        }
      } finallyDo {
        println("Timer related tasks finished!")
      }
    }
  }
}

Note that the first thing what sample is doing is switching to AWTVat using aSend operation. Then we just create frame and fill it with components. Then we create timer and stream. Note that trottle the text changes stream so events will wait for one second before affecting the window (and if something arrives in meanwhile, we start with new value). And we also filter out identical text events.

In normal UI case, unsubscription is rarely needed, but to just demonstrate the feature, we stop listening for all events when user enters world "STOP" and waits for one second. It also would stop parallel timer updates. And then timer closes, UI will be still shown. Just no custom listeners will react to events.