Framework Basics

There is a lot simple asynchronous operations in AsyncScala that compose nicely. In that way, AsyncScala offers structured asynchronous programming like E programming language.

All utility functions mentioned in this section are provided in object AsyncControl. The samples provided here are from test suite, so they use asserts to specify expected results. You could locate this test in source code repository and to run this test yourself from your favorite IDE.

Starting a Vat

The Vat represents a unit of the concurrency in the framework. The Vat allows to enqueue events and later it dispatches it.

Each Vat dispatches events that were sent to it in a different way. More than one vat could be attached to the thread at different moments of time. Vats provide capabilities similar to simple Executor's framework. However they guarantee some additional properties:

  • Events for the single vat are dispatched in the order they are sent to the vat
  • The next event will be dispatched only when the previous event finishes to be dispatched
  • There will be synchronization on the writer thread when event is sent to the vat and synchronization before event is dispatched on the dispatch thread. So there are appropriate write/read barriers in place.
  • When event for the vat is being dispatched, the current value of Vat.current points to the current vat and different components created on this vat use it for receiving own events later.

Note that there are generally no stable association between vats and threads. At each moment of the time, the events from the vat can be dispatched with at most one thread. But the vat could be associated with different threads during its lifetime and tread could be used to dispatch events for different vats.

These properties have an interesting implication. It safe to use unsynchronized mutable data that is shared between different turns in the same vat provided that is made consistent at the end of the turn. The Java's executors framework does not have this property. The runnable objects that are scheduled to execute may to execute in any order and may be executed in parallel in different threads. So it is generally unsafe to use any shared mutable data without synchronization.

While consistency guarantees for vats could simplify even-loop programming, the real gain could come from building higher level operations using promises and creating components.

So run an AsyncScala asynchronous operation it is required to have a live vat that would dispatches events. This could be done in a number of ways, and the simplest way is to start a simplest vat SingleThreadVat that just dispatches events on the current thread. For the most of this guide we only need this vat to demonstrate AsyncScala features.

  @Test
  def testSimple() {
    val vat = new SingleThreadVat()
    val list = new ListBuffer[Int]
    list += 0
    vat.enqueue {() =>
      list += 2
      vat.enqueue {() =>
        list += 4
        vat.stop()
      }
      list += 3
    }
    list += 1
    vat.run()
    list += 5
    assertEquals(List(0, 1, 2, 3, 4, 5), list.toList)
  }

The following done in the example:

  1. A vat is created.
  2. An event is scheduled to vat, but not yet dispatched.
  3. Vat started with the method run run() on the current thread.
  4. The event is dispatched on the vat, and that event schedules next event.
  5. The second event is dispatched on the vat, and that event stops the vat.
  6. The vat exits from method run.

There is more on vats the Advanced Vats chapter

Promise

There is an utility method that creates SingleThreadVat and starts operation on it. The returned value used as result of doAsync operation.

  @Test
  def testDoAsync() {
    val t = doAsync { 23 }
    assertEquals(23, t)
  }

This method starts asynchronous operation specified as its body, and returns when that operation finishes. In the example above, the operation finishes immediately with the value 23. The doAsync operation returns operation result to its caller. If body fails, and exception is rethrown by doAsync operation. This operation is defined on AsyncControl object. It is recommended to import all members of this object in any source that uses AsyncScala classes. It contains a lot useful utility methods that will be discussed in this guide.

  @Test
  def testDoAsyncFail() {
    try {
      doAsync { throw new Exception("Test") }
      fail("Unreachable")
    } catch {
      case ex => assertEquals("Test", ex.getMessage)
    }
  }

However, not all operations could finish for one turn. In general, a asynchronous operation is a logical group events that are dispatched in different vats. However coordinating activities by posting events to event loops is a difficult task, since posting events is akin to "go to" in the plain synchronous programming.

To structure asynchronous operations a notion of Promise is used. A promise represents outcome of asynchronous operation and the promise could be in three states:

  • The unresolved state indicates that the asynchronous operation did not finish yet
  • Being-resolving state indicates that someone actually already taken promise resolver and thus promised to resolve promise later. And if you never resolve promise, all who are listening on it will be waiting forever.
  • The resolved state with one of two possible outcomes:
    • The Success outcome means that asynchronous operation has finished with success. This state is usually associated with some value that is result of asynchronous operation. But like there is void methods in Java, for some operation it is just needed to know that they have completed.
    • The Failure outcome meaning that operation failed. This state is always associated with some Exception that describe the failure.

The promise supports two operations:

  1. Getting resolver. This operation allows to get a resolver object that could be used to bring promise from unresolved state to either resolved or smashed state. The resolver could be invoked only once. The resolver's events are dispatched to the vat that where resolver was got. And this should be the vat where promise was created.
  2. Registering listenerThis operation allows querying promise state. If promise is resolved or smashed, the listener is notified immediately, otherwise the listener is invoked when promise becomes either resolved or smashed.

The listener's and resolvers implement the same interface, so resolver for one promise could be a listener for another promise. In that way the resolve operations could be chained together.

The promise should be used only inside the single vat, and should not be passed between vats. However it is by chaining resolvers relatively easy to create a promise in another vat that resolves after the first promise resolves. And resolvers are safe to use from any thread, even from one that is not associated with any vat. And as we see later, there are operators that help to create such chains.

The promise is very similar to the future, the difference is that it is not possible to block upon it. It is only possible to register listener, that is invoked either immediately or when promise is smashed or resolved.

The promise is a primitive unit of asynchronous control flow. It is possible to build a rich set of control flow constructs basing on promises as we will see in later chapters. In this section we will only consider the trivial operations.

  @Test
  def testPromiseSuccess() {
    val t = doAsync {
      val p = new Promise[Int]
      val resolver: (Outcome[Int]) => Unit = p.resolver
      resolver(Success(23))
      p
    }
    assertEquals(23, t)
  }

  @Test
  def testPromiseFail() {
    try {
      doAsync {
        val p = new Promise[Int]
        p.resolver(Failure(new Exception("Test")))
        p
      }
      Assert.fail("Unreachable")
    } catch {
      case ex => assertEquals("Test", ex.getMessage)
    }
  }

These examples share the following logic:

  1. The constructor is used to create unresolved promise
  2. Resolver is got from promise (note that resolver could be got only once, in order to reduce probability of double resolution bugs)
  3. A specific outcome is provided to resolver. The outcome is either Success or Failure (both are descendants of Outcome class).
  4. The event to resolve promise is posted to event queue.
  5. Not yet resolved promise is returned
  6. On the next turn resolver is dispatched and promise is resolved
  7. The operation doAsync exits depending on outcome with result or failure

As invoking resolver takes extra turn, there some trivial asynchronous operation that can create pre-resolved promises (with failure or success).

  @Test
  def testASuccess() {
    val t = doAsync {
      aSuccess(23)
    }
    assertEquals(23, t)
  }

  @Test
  def testAFailure() {
    try {
      doAsync {
        aFailure(new Exception("Test"))
      }
      fail("Unreachable")
    } catch {
      case ex => assertEquals("Test", ex.getMessage)
    }
  }

  @Test
  def testUnit() {
    val t = doAsync {
      aUnit
    }
    assertEquals((), t)
  }

Note the last test. The promises should be only used only in the context of the vat where they have been created. To other vats is safe to pass only promise resolver. However, after the promise has been resolved with immutable value, it could be safely shared and used as constant (as it is done with Unit promise in implementation of aUnit operator). As reminder, all these utility operations are implemented on AsyncControl object.

The promise could be also listened with listen operation. The listener has the same interface as resolver. So listeners for promises could be easily chained, one promise listener for another. The snippet below demonstrates this.

  @Test
  def testChain() {
    val t = doAsync {
      val start = new Promise[Int]
      val result = new Promise[Int]
      val resultResolver = result.resolver
      val intermediate  = new Promise[Int]
      val intermediateResolver = intermediate.resolver
      start.listen {
        case Failure(ex) => failure(intermediateResolver, ex)
        case Success(o) => success(intermediateResolver, o)
      }
      intermediate.listen(resultResolver)
      success(start.resolver, 42)
      result
    }
    assertEquals(42, t)
  }

The resolution process starts with resolving start promise. When this promise promise is resolver, a registered listener is invoked, and that listener notifies resolver of intermediate promise with success and failure methods. This methods are recommended ways to notify resolvers if do not know their origin, since they catch and log exception that might be generated by send operation. Note that listener is invoked in the thread where promise resolver was create. So it is safe to use mutable state from lexical scope only if you are sure that it will be at the same vat. The framework utility classes handle promises safely (unless there is a serious bug), so usually such listening is safe. If you are not sure, if your particular case is safe, use method that creates safe resolver from function closure. That resolver will always dispatch in the vat where you have created it.

  @Test
  def testChainSafe() {
    var outcome: Outcome[Int] = null
    val t = doAsync {
      val start = new Promise[Int]
      val result = new Promise[Int]
      val resultResolver = result.resolver
      val safeListener = aResolver[Int] {o =>
        outcome = o
        resolve(resultResolver, o)
      }
      start.listen(safeListener)
      success(start.resolver, 42)
      result
    }
    assertEquals(42, t)
    assertEquals(Success(42), outcome)
  }

Note that each listener is notified as many times as you have added it to the promise. Also there is no functionality to remove listeners. The listeners list is cleared automatically when the promise is resolved. After the promise is resolved with success or failure, adding new listeners causes their immediate notification. The listeners are not added to the list of listeners.

The promise is very simple concept, but it is also very powerful. All control constructs described below are implemented using it. Actually, the utility method on AsyncControl object allow program a lot, without using promises explicitly, however underneath there are a lot of promises created.

Doing it now

Sometimes it is very useful just to wrap result of exception execution into promise, independently of whether expression evaluate successfully or just fails. This allows to avoid polluting the code with multiple try catch statements. As you have probably guessed, there is an utility function just for this. This is called aNow (AsyncControl).

  @Test
  def testNowSuccess() {
    var visited = false
    val t = doAsync {
      val p = aNow {
        visited = true
        51
      }
      assertTrue(visited)
      p
    }
    assertEquals(51, t)
    assertTrue(visited)
  }

  @Test
  def testNowSuccess() {
    var visited = false
    val t = doAsync {
      val p = aNow {
        visited = true
        51
      }
      assertTrue(visited)
      p
    }
    assertEquals(51, t)
    assertTrue(visited)
  }

  @Test
  def testNowFailure() {
    var visited = false
    var visited2 = false
    val text = "Fail it!"
    try {
      doAsync {
        val p = aNow {
          visited = true
          if(visited) {
            throw new RuntimeException(text)
          }
          51
        }
        assertTrue(visited)
        visited2 = true
        p
      }
      fail("Should not be here")
    } catch {
      case ex => assertEquals(text, ex.getMessage)
    }
    assertTrue(visited2)
  }

This method is often using when build control constructs. Also there is some automation for creating promise and resolving it it later with resolver. Again it allows catching exceptions in the body, and smashing the resulting promise with it.

  @Test
  def testNowRSuccess() {
    var visited = false
    val t = doAsync {
      val p = aNowR {resolver: Resolver[Int] =>
        visited = true
        success(resolver, 51)
      }
      assertTrue(visited)
      p
    }
    assertEquals(51, t)
    assertTrue(visited)
  }

  @Test
  def testNowRFailure() {
    var visited = false
    var visited2 = false
    val text = "Fail it!"
    try {
      doAsync {
        val p = aNowR {resolver: Resolver[Int] =>
          visited = true
          if (visited) {
            throw new RuntimeException(text)
          }
          success(resolver, 51)
        }
        assertTrue(visited)
        visited2 = true
        p
      }
      fail("Should not be here")
    } catch {
      case ex => assertEquals(text, ex.getMessage)
    }
    assertTrue(visited2)
  }

Note that type of resolver is specified in the test. In most cases this is not needed, since Scala type inference works well enough. This test is probably the only place in AsyncScala code, where the type is specified explicitly. For the second test, you could see that result returned from success operator is ignored. This is because exception thrown from the body takes precedence over value supplied to resolver.

As opposite to aNow, there is aNever[T] asynchronous operation that never ends. Event infinite loop equivalents in AsyncScala are graceful and do not consume resources.

Oneway sends

The one way send operation is actually more fundamental than promise. The promises are build upon it, and resolver returned from promise, does one-way send to the vat, so the promise is resolved in the context of vat, where promise was created. Such resolution allows to avoid a lot of worried about synchronization of listeners. All listeners registered with promise are either exported resolvers, or were registered in the context of the vat where the promise was created. If you are not sure about context, where your listener will be called, use only exported version of it.

The direct way to send to the vat is to use Vat.enqueue() method.

  @Test
  def testDirectEnqueue() {
    var visited = false
    val t = doAsync {
      val p = new Promise[Int]
      val r = p.resolver
      Vat.current.enqueue {()=>
        visited = true
        success(r, 11)
      }
      p
    }
    assertEquals(11, t)
    assertTrue(visited)
  }

However, there is a shortcut for this, in a form of aSend operation (AsyncControl):

  @Test
  def testSendVat() {
    var visited = false
    val t = doAsync {
      val p = new Promise[Int]
      val r = p.resolver
      aSend(Vat.current) {
        visited = true
        success(r, 11)
      }
      p
    }
    assertEquals(11, t)
    assertTrue(visited)
  }

And since sending to the current vat is most typical usage of this construct, it could be simplified even further:

  @Test
  def testSendCurrent() {
    var visited = false
    val t = doAsync {
      val p = new Promise[Int]
      val r = p.resolver
      aSend {
        visited = true
        success(r, 11)
      }
      p
    }
    assertEquals(11, t)
    assertTrue(visited)
  }

Do it later

Another typical situation is when we want to start some asynchronous operation in other vat, or we want continue the doing something on the current turn and schedule some activity for later time on the current vat. If we are not interested in results, there is no problem. We could just use aSend operator. However in most cases we are interested in result. One of possible ways to achieve it is to use promises and resolvers (like it was done in the previous sample). However, because it is quite common task, there is an operator for this as well ( aLater from AsyncControl).

  @Test
  def testLaterVatSuccess() {
    var visited = false
    val t = doAsync {
      val p = aLater(Vat.current) {
        visited = true
        11
      }
      assertFalse(visited)
      p
    }
    assertEquals(11, t)
    assertTrue(visited)
  }

There is a variant for the current vat as well.

  @Test
  def testLaterCurrentSuccess() {
    var visited = false
    val t = doAsync {
      val p = aLater {
        visited = true
        11
      }
      assertFalse(visited)
      p
    }
    assertEquals(11, t)
    assertTrue(visited)
  }

Failures are handled gracefully from the action as well.

  @Test
  def testLaterFailure() {
    var visited = false
    val text = "Fail it!"
    try {
      doAsync {
        val p = aLater {
          visited = true
          if (visited) {
            throw new RuntimeException(text)
          } else {
            11
          }
        }
        assertFalse(visited)
        p
      }
      fail("Should not be here")
    } catch {
      case ex => assertEquals(text, ex.getMessage)
    }
    assertTrue(visited)
  }

As you have probably guessed, the this method could be implemented used the previous constructs like the following:

  def aLater[T](vat: vats.Vat)(f: => Promise[T]): Promise[T] = {
    aNowR {resolver =>
      aSend(vat) {
        aNow(f).listen(resolver)
      }
    }
  }

Back to doAsync

Now we could look back to doAsync method that we have used in this test suite extensively. It also uses some operations defined above.

  def doAsync[T](body: => Promise[T]): T = {
    var outcome: Outcome[T] = null
    val vat = new SingleThreadVat
    aSend(vat) {
      aNow(body).listen {o =>
        outcome = o
        vat.stop()
      }
    }
    vat.run()
    assert(outcome != null, "Runner should stop only after the resolution")
    outcome.force
  }

As you could see, the method does the following:

  1. it creates vat
  2. send message to it (note that vat is not yet started, but already able to enqueue messages)
  3. starts the vat.
  4. The vat dispatches the first message sent to it
  5. The body using aNow method, and a listener is registered for it.
  6. When outcome of the body becomes known, the listener remembers the result, and sends stop signal to the vat.
  7. After dispatching stop signal, the run method of SingleThreadVat quits.
  8. After that outcome is to result of execution using force method:
    • converted to value in case of success
    • exception is thrown in case of failure