Sequential Flow

Listening for promises works, and it is possible to do a lot with just listeners and promises. However, promise represents outcome of asynchronous operation, and thinking about promises and composing them is somewhat low level fun. This section starts describing operators that compose actual asynchronous operations rather than promises. Asynchronous operators in AsyncScala framework take asynchronous operations as inputs, and produce promise that represents outcome of composite asynchronous operation.

Scala is a rich language, and it provides very nice means to express a custom control flow in form of closures. And AsyncScala framework makes very heavy use of them. Most of closures that are used for asynchronous control flow, represent an asynchronous operation, and as such return a promise. We actually have already seen the trivial operator that applies to asynchronous operation in previous section, it is aLater operator. This operator takes a body, that returns a promise and executes it in other vat, or in the same vat later, and it returns a promise.

However, there are other useful asynchronous operations in the framework as well. In this section we will discuss sequential control operators.

When operator

The operator aWhen is one of most useful operators in the framework. Like most of the basic operators (including rest of operators discussed here) it is defined on AsyncControl object. It allows expressing data flow dependencies between asynchronous operations. Its complete form is demonstrated by the following sample.

import collection.mutable.ListBuffer
import net.sf.asyncobjects.asyncscala._
import AsyncControl._

/**
 * The demonstration for when construct
 */
object WhenSample {
  def main(args: Array[String]) {
    val r = doAsync {
      val a = new ListBuffer[Int]
      aWhen(aSuccess(1)) {r =>
        println("Input: " + r)
        a += r
        a += 2
        aLater(3)
      }.then {r =>
        println("From resolved body: " + r)
        a += r
        throw new RuntimeException("test")
      }.thenI {
        a += -1 // never executes
        a // needed for type inference, as type of entire construct is the type of last then body or resolved body
      }.failed {case ex =>
        println("Exception: " + ex)
        a += 4
        a
      }.finallyDo {
        println("Executing finally action")
        a += 5
      }
    }
    println("When result: " + r)
  }
}

This program prints the following:

Input: 1
From resolved body: 3
Exception: java.lang.RuntimeException: test
Executing finally action
When result: ListBuffer(1, 2, 3, 4, 5)

There are the following stages in running this sample:

  1. The first thing to note that result of aWhen operator is a promise. In the sample, this promise is used by doAsync operator to determine when to exit from the event loop.
  2. The first argument to aWhen operator is an asynchronous operation, that is executed when aWhen operator starts.
  3. If the operation is successful, the control is passed to the next argument of aWhen operator, and a value produced by the first operation is passed to the next block.
  4. Then aWhen operator waits when that blocks finishes, in our example it finishes successfully as well, and passes control to the next block added to aWhen operator by then method.
  5. And the next block demonstrates what happens when some asynchronous operation in aWhen operator fails. This block fails with exception.
  6. Since the previous asynchronous operation failed, the last body added by thenI is never executed. Note, that thenI is a short form of then method. It accept blocks without arguments, and it is used when the next operation is only interested in the fact, that the previous operation completed successfully, by exact result is irrelevant. The previous operation returns promise for Nothing and we just cannot get nothing as input. If exception was not thrown, the result of this block would have been a result of the entire operation, but the block just contains unreachable code.
  7. Since one of operations failed, the control is passed to the block added by failed method. Throwable with which some operation failed is passed to that block as argument. And like Scala catch, it is a partial function, so you should use case expression to handle errors, if exception is not handled, it is passed further to outer scope. The block itself might be successful or fail. In this sample it finishes successfully with a list as result. The result of this block will be used as total result of the operator.
  8. But before this operator completes, there is one piece of the code left to execute. It is block specified by finallyDo. This block is always executed at the end of aWhen operator, whether it is successful or failed with exception. If asynchronous operation specified as argument to finallyDo will fail with exception, this operation will be used as failure outcome for the entire aWhen operator, just like exception thrown in finally block of try/catch statement in Java. As finallyDo is the last method invoked on WhenBuidler utility object, it returns a promise for the entire operator.
  9. Then the entire operation completes successfully with list as result, and

This was a full form of aWhen operator. And there is a number of shortcuts.

Last block: aWhenLast, thenLast, failedLast

The full form of aWhen operator ends with finallyDo. But often, only some parts of aWhen operator are actually needed. For that purpose, it is possible to use Last suffix to specify the fact that no other bocks are expected. The sample below demonstrates aWhenLast operator. The samples are taken from test suit. The expectEquals method checks if the result asynchronous operation match expected. We will see how this operator is implemented later in this section.

  @Test
  def testWhenLast() {
    doAsync {
      expectEquals(3) {
        aWhenLast(aSuccess(2)) {r =>
          r + 1
        }
      }
    }
  }

The operator aWhenLast is often used in expression form like the following:

  @Test
  def testWhenLastShort() {
    doAsync {
      expectEquals(3) {
        aWhenLast(aSuccess(2))(_ + 1)
      }
    }
  }

If only aWhen and then parts of aWhen are needed, there is thenLast method.

  @Test
  def testThenLast() {
    doAsync {
      expectEquals(5L) {
        aWhen(aSuccess(2)) {r =>
          r + 1L
        }.thenLast {
          _ + 2
        }
      }
    }
  }

The operator aWhen might also end with failedLast part as in the following test. This is method is used when there is no need for finallyDo block.

  @Test
  def testFailedLast() {
    doAsync {
      expectEquals(-1) {
        aWhen(2) {r =>
          r / (r - 2)
        }.failedLast {
          case _ => -1
        }
      }
    }
  }

The exception will be handled by aWhen operator even if is thrown from expression in parenthesis.

  @Test
  def testHeadFailedLast() {
    doAsync {
      expectEquals(-1) {
        val t = 2
        aWhen(t / (t - 2)) {r =>
          r * r
        } failedLast {
          case _ => -1
        }
      }
    }

Also note, that failed and failedLast use partial functions, and not handled exception are passed to outer resolvers.

  @Test
  def testPartialCatch() {
    doAsync {
      expectFailure(classOf[ArithmeticException]) {
        aWhen(aSuccess(0)) {
          1 / _
        }.failedLast {
          case x : IllegalArgumentException => 1
        }
      }
    }
  }

This method also demonstrates usage of "expectFailure" method, that will be discussed later sections.

The suffix Last is also typical way to end building operators and get promise for other operator builders aAll, aAny, aPar, and others.

aSeq

The first shortcut is aSeq form. The operator aWhen wants an asynchronous expression as the first argument. However, the first expression sometimes is quite big and it would looks somewhat strange in that place. It is also possible that there is no need for two asynchronous operations tied together, and one of them has to be empty. In that case aSeq operator is used. This operator also uses the same operator builder and aWhen operator, and aSeq {seqBody} is equivalent toaWhen(aUnit){_ => seqBody}. Note, that there is no aSeqLast form, since such form would be just equivalent for aLater operator.

One of previous samples could be written as the following using aSeq operator.

  @Test
  def testSeqFailedLast() {
    doAsync {
      expectEquals(-1) {
        aSeq {
          val r = 2
          r / (r - 2)
        }.failedLast {
          case _ => -1
        }
      }
    }

Ignored inputs: aWhenI, aWhenLastI, thenI, thenLastI

Sometimes, it is important that the previous block has finished, but it is not required to know exact result of such block. The typical case is when promise for Unit type is returned. Even more, there are cases when it is not possible to get a value of execution of previous block. It is when promise for Nothing is returned by asynchronous operation. For such cases, it is possible use form with suffix I (ignored input). If both I and Last, suffixes need to be specified, the I suffix go after Last suffix.

  @Test
  def testWhenLastI() {
    doAsync {
      expectEquals(23) {
        aWhenLastI(aUnit) {
          aLater{
            23
          }
        }
      }
    }
  }

  @Test
  def testWhenThenI() {
    doAsync {
      expectFailure(classOf[IllegalStateException]) {
        aWhenI(aUnit) {
          aLater(aUnit)
        } thenI {
          throw new IllegalStateException()
        } thenLastI {
          aFailure(new RuntimeException())
        }
      }
    }
  }

Expression forms: aSeqI and aSeqE

In many classes, it is needed to wait for some operation that returns promise for unit or other unimportant value, and after that immediately proceed with other asynchronous operation or just yield value. For that case, there is aSeqI two and three argument operator:

  @Test
  def testSeqI() {
    doAsync {
      expectEquals(true) {
        aSeqI(aLater(aUnit), true)
      }
    }
  }

There is also expression form of aSeq operator that has dependencies between arguments.

  @Test
  def testSeqESimple() {
    doAsync {
      expectEquals(23) {
        aSeqE(aSuccess(2))(_ * 10 + 3)
      }
    }
  }

It is practically equivalent to aWhenLst operator, but there is an interesting twist. If the first argument is already resolved, it starts the second argument immediately, rather than on next turn as aWhenLast operator does.

  @Test
  def testSeqE() {
    doAsync {
      expectEquals(List(1, 2, 3)) {
        val l = new ListBuffer[Int]
        aSeq {
          l += 1
          val p = aSeqE(aSuccess(2)) {r =>
            l += 2
          }
          l += 3
          p
        } thenLastI {
          l.toList
        }
      }
    }
  }

Back to expectEquals and expectFailure

We have used methods expectEquals and expectFailure extensively in this chapter. These methods are also implemented using aSeq operator. And their implementation is not that different from how one would have implemented using synchronous programming.

  def expectEquals[T](v: T)(body: => Promise[T]): Promise[T] = {
    aWhenLast(body) {r =>
      Assert.assertEquals(v, r)
      r
    }
  }

  def expectFailure[T](t: Class[_ <: Throwable])(body: => Promise[T]): Promise[Throwable] = {
    var failed: Boolean = true
    aSeq {
      body
    } thenI {
      failed = false
      Assert.fail("The body should have failed with " + t.getName)
      null.asInstanceOf[Throwable]
    } failedLast {
      case f if (failed && f.getClass == t) => aSuccess(f)
    }
  }

Note that the line null.asInstanceOf[Throwable] is just to help Scala type inference. This line is actually unreachable.

Sequential Loops

Since almost any asynchronous operator creates a new promise and use operations passed as arguments to get result for it, it is dangerous to create tail recursive loops that create promises, without extending compiler and language. It only possible to create loops that return voids. However, repeating something is very common in programming, so framework provides quite a few loop operators. This section focuses on sequential loops. In sequential loops, the next iteration starts only when previous is complete. If iteration failed, the loop finishes with that failure.

aSeqLoop

This is a simplest loop form, and one of most often used one. Many other loop forms are based upon it. This loop operator takes a closure as argument that returns a promise for boolean. If promise resolves to true, the body is executed again, if false, the loops exits. The loop operator itself returns a promise for unit, and it is resolved with unit value when its execution finishes successful. If body of loop fails with exception, the loop fails with the same exception as well.

  @Test
  def testSeqLoop() {
    doAsync {
      expectEquals(6) {
        var s = 0
        aSeq {
          var i = 0
          aSeqLoop {
            i = i + 1
            if (i > 3) {
              false
            } else {
              aLater {
                s += i; true
              }
            }
          }
        } thenLastI {
          s
        }
      }
    }
  }

aSeqWhile and aSeqWhileUnit

The previous form looks very similar to Java {{{do {} while()}}} statement. There is a form that is similar to normal wile operator.

  @Test
  def testSeqWhile() {
    doAsync {
      expectEquals(6) {
        var s = 0
        aSeq {
          var i = 0
          aSeqWhileUnit(i < 3) {
            i = i + 1
            s += i
          }
        } thenLastI {
          s
        }
      }
    }
  }

You have possibly already noticed strange "Unit" suffix here. This suffix presents because there are other forms of aSeqWhile operators. The form without suffix performs folding values returned from loop body. And using folding form, the loop above could be rewritten as the following:

  @Test
  def testSeqWhileFold() {
    doAsync {
      expectEquals(6) {
        var i = 0
        aSeqWhile(i < 3) {
          i = i + 1
          i
        }.foldLeft(0) {
          (a, b) => aLater(a + b)
        }
      }
    }
  }

Note, that strictly speaking, the body of foldLeft operation is asynchronous operation as well. And it is executed after body of the loop finishes and before next iteration started. A quite useless for this case aLater operator is added to demonstrate this.

The operator aSeqWhile has also some standard folding operators toList and toListBuffer that return promise for List and ListBuffer respectively.

  @Test
  def testSeqWhileList() {
    doAsync {
      expectEquals(List(1, 2, 3)) {
        var i = 0
        aSeqWhile(i < 3) {
          i = i + 1
          i
        }.toList
      }
    }
  }

aSeqFor

There are also aSeqFor that allow sequential iteration over types that extend Scala Iterable[_] or Java'sIterable[_]. This method also support all folding constructs that are supported by aSeqWhile operator, and actually aSeqWile and aSeqFor share a common operator builder. And two samples above could be rewritten as the following:

  @Test
  def testSeqForUnit() {
    doAsync {
      expectEquals(6) {
        var s = 0
        aSeq {
          aSeqForUnit(1 to 3) {
            s += _
          }
        } thenLastI {
          s
        }
      }
    }
  }

  @Test
  def testSeqForFold() {
    doAsync {
      expectEquals(12) {
        aSeqFor(List(1, 2, 3)) {
          i => i * 2
        }.foldLeft(0) {
          (a, b) => aLater(a + b)
        }
      }
    }
  }

And in case when loop body is too trivial to deserve an own block, and when you are more interested in folding part of the loop. There is a special form just for that, that defaults body toi => i.

  @Test
  def testSeqForFoldShort() {
    doAsync {
      expectEquals(6) {
        aSeqForFold(1 to 3, 0) {
          (a, i) => aLater(a + i)
        }
      }
    }
  }

As you can see, the asynchronous sequential programming with AsyncScala is not much more difficult than synchronous sequential programming. However, asynchronous programming becomes interesting, when there are several activities going on the single even loop at the same time. And the next section describes operators that are used to start such activities in structured and controlled way.