Listening for promises works, and it is possible to do a lot with just listeners and promises. However, promise represents outcome of asynchronous operation, and thinking about promises and composing them is somewhat low level fun. This section starts describing operators that compose actual asynchronous operations rather than promises. Asynchronous operators in AsyncScala framework take asynchronous operations as inputs, and produce promise that represents outcome of composite asynchronous operation.
Scala is a rich language, and it provides very nice means to express a custom control flow in form of closures. And AsyncScala framework makes very heavy use of them. Most of closures that are used for asynchronous control flow, represent an asynchronous operation, and as such return a promise. We actually have already seen the trivial operator that applies to asynchronous operation in previous section, it is aLater operator. This operator takes a body, that returns a promise and executes it in other vat, or in the same vat later, and it returns a promise.
However, there are other useful asynchronous operations in the framework as well. In this section we will discuss sequential control operators.
The operator aWhen is one of most useful operators in the framework. Like most of the basic operators (including rest of operators discussed here) it is defined on AsyncControl object. It allows expressing data flow dependencies between asynchronous operations. Its complete form is demonstrated by the following sample.
import collection.mutable.ListBuffer import net.sf.asyncobjects.asyncscala._ import AsyncControl._ /** * The demonstration for when construct */ object WhenSample { def main(args: Array[String]) { val r = doAsync { val a = new ListBuffer[Int] aWhen(aSuccess(1)) {r => println("Input: " + r) a += r a += 2 aLater(3) }.then {r => println("From resolved body: " + r) a += r throw new RuntimeException("test") }.thenI { a += -1 // never executes a // needed for type inference, as type of entire construct is the type of last then body or resolved body }.failed {case ex => println("Exception: " + ex) a += 4 a }.finallyDo { println("Executing finally action") a += 5 } } println("When result: " + r) } }
This program prints the following:
Input: 1 From resolved body: 3 Exception: java.lang.RuntimeException: test Executing finally action When result: ListBuffer(1, 2, 3, 4, 5)
There are the following stages in running this sample:
This was a full form of aWhen operator. And there is a number of shortcuts.
The full form of aWhen operator ends with finallyDo. But often, only some parts of aWhen operator are actually needed. For that purpose, it is possible to use Last suffix to specify the fact that no other bocks are expected. The sample below demonstrates aWhenLast operator. The samples are taken from test suit. The expectEquals method checks if the result asynchronous operation match expected. We will see how this operator is implemented later in this section.
@Test def testWhenLast() { doAsync { expectEquals(3) { aWhenLast(aSuccess(2)) {r => r + 1 } } } }
The operator aWhenLast is often used in expression form like the following:
@Test def testWhenLastShort() { doAsync { expectEquals(3) { aWhenLast(aSuccess(2))(_ + 1) } } }
If only aWhen and then parts of aWhen are needed, there is thenLast method.
@Test def testThenLast() { doAsync { expectEquals(5L) { aWhen(aSuccess(2)) {r => r + 1L }.thenLast { _ + 2 } } } }
The operator aWhen might also end with failedLast part as in the following test. This is method is used when there is no need for finallyDo block.
@Test def testFailedLast() { doAsync { expectEquals(-1) { aWhen(2) {r => r / (r - 2) }.failedLast { case _ => -1 } } } }
The exception will be handled by aWhen operator even if is thrown from expression in parenthesis.
@Test def testHeadFailedLast() { doAsync { expectEquals(-1) { val t = 2 aWhen(t / (t - 2)) {r => r * r } failedLast { case _ => -1 } } }
Also note, that failed and failedLast use partial functions, and not handled exception are passed to outer resolvers.
@Test def testPartialCatch() { doAsync { expectFailure(classOf[ArithmeticException]) { aWhen(aSuccess(0)) { 1 / _ }.failedLast { case x : IllegalArgumentException => 1 } } } }
This method also demonstrates usage of "expectFailure" method, that will be discussed later sections.
The suffix Last is also typical way to end building operators and get promise for other operator builders aAll, aAny, aPar, and others.
The first shortcut is aSeq form. The operator aWhen wants an asynchronous expression as the first argument. However, the first expression sometimes is quite big and it would looks somewhat strange in that place. It is also possible that there is no need for two asynchronous operations tied together, and one of them has to be empty. In that case aSeq operator is used. This operator also uses the same operator builder and aWhen operator, and aSeq {seqBody} is equivalent toaWhen(aUnit){_ => seqBody}. Note, that there is no aSeqLast form, since such form would be just equivalent for aLater operator.
One of previous samples could be written as the following using aSeq operator.
@Test def testSeqFailedLast() { doAsync { expectEquals(-1) { aSeq { val r = 2 r / (r - 2) }.failedLast { case _ => -1 } } }
Sometimes, it is important that the previous block has finished, but it is not required to know exact result of such block. The typical case is when promise for Unit type is returned. Even more, there are cases when it is not possible to get a value of execution of previous block. It is when promise for Nothing is returned by asynchronous operation. For such cases, it is possible use form with suffix I (ignored input). If both I and Last, suffixes need to be specified, the I suffix go after Last suffix.
@Test def testWhenLastI() { doAsync { expectEquals(23) { aWhenLastI(aUnit) { aLater{ 23 } } } } } @Test def testWhenThenI() { doAsync { expectFailure(classOf[IllegalStateException]) { aWhenI(aUnit) { aLater(aUnit) } thenI { throw new IllegalStateException() } thenLastI { aFailure(new RuntimeException()) } } } }
In many classes, it is needed to wait for some operation that returns promise for unit or other unimportant value, and after that immediately proceed with other asynchronous operation or just yield value. For that case, there is aSeqI two and three argument operator:
@Test def testSeqI() { doAsync { expectEquals(true) { aSeqI(aLater(aUnit), true) } } }
There is also expression form of aSeq operator that has dependencies between arguments.
@Test def testSeqESimple() { doAsync { expectEquals(23) { aSeqE(aSuccess(2))(_ * 10 + 3) } } }
It is practically equivalent to aWhenLst operator, but there is an interesting twist. If the first argument is already resolved, it starts the second argument immediately, rather than on next turn as aWhenLast operator does.
@Test def testSeqE() { doAsync { expectEquals(List(1, 2, 3)) { val l = new ListBuffer[Int] aSeq { l += 1 val p = aSeqE(aSuccess(2)) {r => l += 2 } l += 3 p } thenLastI { l.toList } } } }
We have used methods expectEquals and expectFailure extensively in this chapter. These methods are also implemented using aSeq operator. And their implementation is not that different from how one would have implemented using synchronous programming.
def expectEquals[T](v: T)(body: => Promise[T]): Promise[T] = { aWhenLast(body) {r => Assert.assertEquals(v, r) r } } def expectFailure[T](t: Class[_ <: Throwable])(body: => Promise[T]): Promise[Throwable] = { var failed: Boolean = true aSeq { body } thenI { failed = false Assert.fail("The body should have failed with " + t.getName) null.asInstanceOf[Throwable] } failedLast { case f if (failed && f.getClass == t) => aSuccess(f) } }
Note that the line null.asInstanceOf[Throwable] is just to help Scala type inference. This line is actually unreachable.
Since almost any asynchronous operator creates a new promise and use operations passed as arguments to get result for it, it is dangerous to create tail recursive loops that create promises, without extending compiler and language. It only possible to create loops that return voids. However, repeating something is very common in programming, so framework provides quite a few loop operators. This section focuses on sequential loops. In sequential loops, the next iteration starts only when previous is complete. If iteration failed, the loop finishes with that failure.
This is a simplest loop form, and one of most often used one. Many other loop forms are based upon it. This loop operator takes a closure as argument that returns a promise for boolean. If promise resolves to true, the body is executed again, if false, the loops exits. The loop operator itself returns a promise for unit, and it is resolved with unit value when its execution finishes successful. If body of loop fails with exception, the loop fails with the same exception as well.
@Test def testSeqLoop() { doAsync { expectEquals(6) { var s = 0 aSeq { var i = 0 aSeqLoop { i = i + 1 if (i > 3) { false } else { aLater { s += i; true } } } } thenLastI { s } } } }
The previous form looks very similar to Java {{{do {} while()}}} statement. There is a form that is similar to normal wile operator.
@Test def testSeqWhile() { doAsync { expectEquals(6) { var s = 0 aSeq { var i = 0 aSeqWhileUnit(i < 3) { i = i + 1 s += i } } thenLastI { s } } } }
You have possibly already noticed strange "Unit" suffix here. This suffix presents because there are other forms of aSeqWhile operators. The form without suffix performs folding values returned from loop body. And using folding form, the loop above could be rewritten as the following:
@Test def testSeqWhileFold() { doAsync { expectEquals(6) { var i = 0 aSeqWhile(i < 3) { i = i + 1 i }.foldLeft(0) { (a, b) => aLater(a + b) } } } }
Note, that strictly speaking, the body of foldLeft operation is asynchronous operation as well. And it is executed after body of the loop finishes and before next iteration started. A quite useless for this case aLater operator is added to demonstrate this.
The operator aSeqWhile has also some standard folding operators toList and toListBuffer that return promise for List and ListBuffer respectively.
@Test def testSeqWhileList() { doAsync { expectEquals(List(1, 2, 3)) { var i = 0 aSeqWhile(i < 3) { i = i + 1 i }.toList } } }
There are also aSeqFor that allow sequential iteration over types that extend Scala Iterable[_] or Java'sIterable[_]. This method also support all folding constructs that are supported by aSeqWhile operator, and actually aSeqWile and aSeqFor share a common operator builder. And two samples above could be rewritten as the following:
@Test def testSeqForUnit() { doAsync { expectEquals(6) { var s = 0 aSeq { aSeqForUnit(1 to 3) { s += _ } } thenLastI { s } } } } @Test def testSeqForFold() { doAsync { expectEquals(12) { aSeqFor(List(1, 2, 3)) { i => i * 2 }.foldLeft(0) { (a, b) => aLater(a + b) } } } }
And in case when loop body is too trivial to deserve an own block, and when you are more interested in folding part of the loop. There is a special form just for that, that defaults body toi => i.
@Test def testSeqForFoldShort() { doAsync { expectEquals(6) { aSeqForFold(1 to 3, 0) { (a, i) => aLater(a + i) } } } }
As you can see, the asynchronous sequential programming with AsyncScala is not much more difficult than synchronous sequential programming. However, asynchronous programming becomes interesting, when there are several activities going on the single even loop at the same time. And the next section describes operators that are used to start such activities in structured and controlled way.