Interleaving Flow

The fact that we could keep sequential control flow with event-driven applications is nice. But the power of event-driven application lies in the fact, that we could execute several logical asynchronous operations on the same event loop at the same time. While events are dispatched one event time, external events that cause event dispatch actually happen in other threads and event processes. Such events include network and file IO, GUI events, timer events, etc.

And this fact was known and exploited long ago in modern GUI frameworks. The most popular and actually used area where this fact is exploited is modern GUI programming. Showing and working with one Gui window could be viewed as a logical asynchronous operation. That starts when window has shown and which finishes when window closed. There are multiple windows open at the same time, each with own state. They relatively safely interact with each others within single even loop, reacting at one event at time. Sometimes even components from different processes that use different event loops interact with each other relatively safely. And sometimes (as in Windows 3.1) all processes share the same event loop. Everything goes nice until the application needs to call non-GUI service (for example file system or network), that does not share the same event loop paradigm. In that case we experiencing freezes and other not-so-nice things.

So starting interleaving operations is an important thing and AsyncScala offers multiple ways to do it. Note that operations are started on the same vat, where operator invoked. To do something on other vat, you would actually need to use aSend(vat){...} or aLater(vat){...} operators from operators provided here (both are from AsyncControl like rest of operators discussed here).

aAll operator

The simplest operator for launching interleaving activities is aAll operator. It launches its bodies and creates a promise that resolves to tuple that consists of the resolution for its bodies. The additional bodies added by and method and the last body is added by andLast method:

  @Test
  def testAll() {
    doAsync {
      val l = new ListBuffer[Int]
      aSeq {
        expectEquals((1, 2, 3)) {
          val p = new Promise[Unit]
          val r = p.resolver
          aAll {
            aLater {
              aLater {
                l.append(1)
                1
              }
            }
          } and {
            aWhenLastI(p) {
              l.append(2)
              2
            }
          } andLast {
            success(r, ())
            l.append(3)
            3
          }
        }
      } thenLastI {
        assertEquals(List(3, 2, 1), l.toList)
      }
    }
  }

As you could seen in this somewhat convoluted test for aAll operator, all processes go in interleaving fashion, and they actually finish in reverse order than they have started. If we would have used aSeq{...}then{...}thenLast{...} operator instead of aAll operator, the asynchronous process would have never finished, since there is a dependency of the second branch on the actions of the third branch. Direct usage of promises is could lead to interesting live lock bugs. The bugs are somewhat different from dead lock bugs since they do not block the event loop, but still could case program to break. We will return to this problem when we discuss aAny operator.

Note that aAll operator waits while all branches finish even in the case when one of branches failed with exception. So you might be sure that all asynchronous operations that were argument to aAll operator finished when promise returned from aAll operator resolves. You could see it in the following test:

  @Test
  def testAllFailure() {
    doAsync {
      var visited = false
      aSeq {
        expectFailure(classOf[IllegalStateException]) {
          aAll {
            aLater {
              aLater {
                visited = true
                1
              }
            }
          } andLast {
            aFailure(new IllegalStateException())
          }
        }
      } thenLastI {
        assertTrue(visited)
      }
    }
  }

When some branch fails, the entire operator fails with the same exception. This behavior is ok when we do not need to know which branch failed. Sometimes it is important to know which branch of aAll operator failed. There is a special method on promise that creates promise for outcome. Note that you must do it for all branches, unless you are quite sure that that branch never fails:

  @Test
  def testAllOutcome() {
    doAsync {
      aSeq {
        aAll {
          aNow(1).toOutcome
        } and {
          aFailure[Int](new IllegalStateException()).toOutcome
        } andLast {
          aLater(3).toOutcome
        }
      } thenLast {
        case (Success(1), Failure(_: IllegalStateException), Success(3)) => () // ok
        case x => fail("Unexpected value: " + x)
      }
    }
  }

There is also an expression-like form of aAll operator that is more convenient for use in expression context (for example as first argument of aWhen operator):

  @Test
  def testAllShort() {
    doAsync {
      expectEquals((1, 2, 3)) {
        aAll(1, aNow(2), aLater(3))
      }
    }
  }

Only most often used forms with number of arguments up to four are in the object AsyncControl, the rest of the expression forms could be located at AllExpressions object

aAllFor and aAllForUnit

There are also loop forms for aAll operator named aAllFor. This loop starts its bodies in parallel, however the folding is done sequentially, just like in case of aSeqFor loop. This is because there is a data dependency on previous folding result. Also if body or previous folding fails, the folding stops.

  @Test
  def testAllFoldLeft() {
    val list = List(1, 2, 3)
    doAsync {
      expectEquals(list.reverse) {
        aAllFor(list) {
          i => i
        }.foldLeft(List[Int]()) {
          (a, b) => b :: a
        }
      }
    }
  }

  @Test
  def testAllFoldRight() {
    val list = List(1, 2, 3)
    doAsync {
      expectEquals(list) {
        aAllFor(list) {
          i => i
        }.foldRight(List[Int]()) {
          (a, b) => a :: b
        }
      }
    }
  }

Like with aSeqFor operator, there are some utility forms for aAllFor operator.

  @Test
  def testAllForList() {
    val list = List(1, 2, 3)
    doAsync {
      expectEquals(list) {
        aAllFor(list) {
          i => i
        }.toList
      }
    }
  }

  @Test
  def testAllForUnit() {
    val list = List(1, 2, 3)
    doAsync {
      expectEquals(6) {
        var sum = 0
        aSeq {
          aAllForUnit(list) {
            i => sum += i
          }
        } thenLastI {
          sum
        }
      }
    }
  }

Limiting Interleaving

Note, that aAllFor operator start its bodies as soon as it is possible (for synchronous collections everything is started on the same turn). So aAllFor for allocate all needed resources for its work (arrays, buffers, files, etc.), and if you start a lot of activities at the same time, you could fail due to the lack of resources. There are several components that allow to perform such self moderation. They will be considered in TBD section of the guide.

aAny

Other kind of interleaving activity is that when you need to launch several activities at the same time, but you need a result from any of them that finishes first, the result from other one could be ignored. For this purpose there is aAny operator. It launches all its bodies and uses the first result that arrived, independently of whether it was success of failure.

  @Test
  def testAny() {
    doAsync {
      expectEquals(2) {
        aAny {
          aLater(aLater(1))
        } or {
          aNow(2)
        } orLast {
          aLater(3)
        }
      }
    }
  }

  @Test
  def testAnyFailure() {
    doAsync {
      expectFailure(classOf[IllegalStateException]) {
        aAny {
          aLater(aLater(1))
        } or {
          aFailure(new IllegalStateException("Test"))
        } orLast {
          aLater(3)
        }
      }
    }
  }

The sample from the aAll, could be rewritten using aSeq operator as the following:

import net.sf.asyncobjects.asyncscala.Promise
import net.sf.asyncobjects.asyncscala.AsyncControl._
import net.sf.asyncobjects.asyncscala.Promise._
import net.sf.asyncobjects.asyncscala.util.ResourceUtil._
import net.sf.asyncobjects.asyncscala.nio.adapters.Adapters
import java.nio.ByteBuffer

object SeqHangUpSample {
  def main(args:Array[String]) {
    doAsync {
      aAny {
        val p = new Promise[Unit]
        val r = p.resolver
        aSeq {
          aLater {
            aLater {
              println("First!")
            }
          }
        } thenI {
          aWhenLastI(p) {
            println("Second!")
          }
        } thenLastI {
          success(r, ())
          println("Third!")
        }.toUnit
      } orLast {
        println("Press Enter to Exit..")
        aUsing(Adapters.actorInput(System.in)) { in =>
          in.read(ByteBuffer.allocate(10))
        }.toUnit
      }
    }
  }

And you could see that the program does exits on its own when it is run. To exit the program you need to press enter key, so the second branch of aAny operator would read something from std and exits. The asynchronous IO streams are discussed in the later section of the document, so just ignore it for now.

In future it is planned to add a variant of aAny that prefer successful values, and returns a failure only if there is no more hope for successful outcome. It is also planned to add a variant of aAny that allows cancelling non-finished activities and closing resources that did not got to client.