There is a lot simple asynchronous operations in AsyncScala that compose nicely. In that way, AsyncScala offers structured asynchronous programming like E programming language.
All utility functions mentioned in this section are provided in object AsyncControl. The samples provided here are from test suite, so they use asserts to specify expected results. You could locate this test in source code repository and to run this test yourself from your favorite IDE.
The Vat represents a unit of the concurrency in the framework. The Vat allows to enqueue events and later it dispatches it.
Each Vat dispatches events that were sent to it in a different way. More than one vat could be attached to the thread at different moments of time. Vats provide capabilities similar to simple Executor's framework. However they guarantee some additional properties:
Note that there are generally no stable association between vats and threads. At each moment of the time, the events from the vat can be dispatched with at most one thread. But the vat could be associated with different threads during its lifetime and tread could be used to dispatch events for different vats.
These properties have an interesting implication. It safe to use unsynchronized mutable data that is shared between different turns in the same vat provided that is made consistent at the end of the turn. The Java's executors framework does not have this property. The runnable objects that are scheduled to execute may to execute in any order and may be executed in parallel in different threads. So it is generally unsafe to use any shared mutable data without synchronization.
While consistency guarantees for vats could simplify even-loop programming, the real gain could come from building higher level operations using promises and creating components.
So run an AsyncScala asynchronous operation it is required to have a live vat that would dispatches events. This could be done in a number of ways, and the simplest way is to start a simplest vat SingleThreadVat that just dispatches events on the current thread. For the most of this guide we only need this vat to demonstrate AsyncScala features.
@Test def testSimple() { val vat = new SingleThreadVat() val list = new ListBuffer[Int] list += 0 vat.enqueue {() => list += 2 vat.enqueue {() => list += 4 vat.stop() } list += 3 } list += 1 vat.run() list += 5 assertEquals(List(0, 1, 2, 3, 4, 5), list.toList) }
The following done in the example:
There is more on vats the Advanced Vats chapter
There is an utility method that creates SingleThreadVat and starts operation on it. The returned value used as result of doAsync operation.
@Test def testDoAsync() { val t = doAsync { 23 } assertEquals(23, t) }
This method starts asynchronous operation specified as its body, and returns when that operation finishes. In the example above, the operation finishes immediately with the value 23. The doAsync operation returns operation result to its caller. If body fails, and exception is rethrown by doAsync operation. This operation is defined on AsyncControl object. It is recommended to import all members of this object in any source that uses AsyncScala classes. It contains a lot useful utility methods that will be discussed in this guide.
@Test def testDoAsyncFail() { try { doAsync { throw new Exception("Test") } fail("Unreachable") } catch { case ex => assertEquals("Test", ex.getMessage) } }
However, not all operations could finish for one turn. In general, a asynchronous operation is a logical group events that are dispatched in different vats. However coordinating activities by posting events to event loops is a difficult task, since posting events is akin to "go to" in the plain synchronous programming.
To structure asynchronous operations a notion of Promise is used. A promise represents outcome of asynchronous operation and the promise could be in three states:
The promise supports two operations:
The listener's and resolvers implement the same interface, so resolver for one promise could be a listener for another promise. In that way the resolve operations could be chained together.
The promise should be used only inside the single vat, and should not be passed between vats. However it is by chaining resolvers relatively easy to create a promise in another vat that resolves after the first promise resolves. And resolvers are safe to use from any thread, even from one that is not associated with any vat. And as we see later, there are operators that help to create such chains.
The promise is very similar to the future, the difference is that it is not possible to block upon it. It is only possible to register listener, that is invoked either immediately or when promise is smashed or resolved.
The promise is a primitive unit of asynchronous control flow. It is possible to build a rich set of control flow constructs basing on promises as we will see in later chapters. In this section we will only consider the trivial operations.
@Test def testPromiseSuccess() { val t = doAsync { val p = new Promise[Int] val resolver: (Outcome[Int]) => Unit = p.resolver resolver(Success(23)) p } assertEquals(23, t) } @Test def testPromiseFail() { try { doAsync { val p = new Promise[Int] p.resolver(Failure(new Exception("Test"))) p } Assert.fail("Unreachable") } catch { case ex => assertEquals("Test", ex.getMessage) } }
These examples share the following logic:
As invoking resolver takes extra turn, there some trivial asynchronous operation that can create pre-resolved promises (with failure or success).
@Test def testASuccess() { val t = doAsync { aSuccess(23) } assertEquals(23, t) } @Test def testAFailure() { try { doAsync { aFailure(new Exception("Test")) } fail("Unreachable") } catch { case ex => assertEquals("Test", ex.getMessage) } } @Test def testUnit() { val t = doAsync { aUnit } assertEquals((), t) }
Note the last test. The promises should be only used only in the context of the vat where they have been created. To other vats is safe to pass only promise resolver. However, after the promise has been resolved with immutable value, it could be safely shared and used as constant (as it is done with Unit promise in implementation of aUnit operator). As reminder, all these utility operations are implemented on AsyncControl object.
The promise could be also listened with listen operation. The listener has the same interface as resolver. So listeners for promises could be easily chained, one promise listener for another. The snippet below demonstrates this.
@Test def testChain() { val t = doAsync { val start = new Promise[Int] val result = new Promise[Int] val resultResolver = result.resolver val intermediate = new Promise[Int] val intermediateResolver = intermediate.resolver start.listen { case Failure(ex) => failure(intermediateResolver, ex) case Success(o) => success(intermediateResolver, o) } intermediate.listen(resultResolver) success(start.resolver, 42) result } assertEquals(42, t) }
The resolution process starts with resolving start promise. When this promise promise is resolver, a registered listener is invoked, and that listener notifies resolver of intermediate promise with success and failure methods. This methods are recommended ways to notify resolvers if do not know their origin, since they catch and log exception that might be generated by send operation. Note that listener is invoked in the thread where promise resolver was create. So it is safe to use mutable state from lexical scope only if you are sure that it will be at the same vat. The framework utility classes handle promises safely (unless there is a serious bug), so usually such listening is safe. If you are not sure, if your particular case is safe, use method that creates safe resolver from function closure. That resolver will always dispatch in the vat where you have created it.
@Test def testChainSafe() { var outcome: Outcome[Int] = null val t = doAsync { val start = new Promise[Int] val result = new Promise[Int] val resultResolver = result.resolver val safeListener = aResolver[Int] {o => outcome = o resolve(resultResolver, o) } start.listen(safeListener) success(start.resolver, 42) result } assertEquals(42, t) assertEquals(Success(42), outcome) }
Note that each listener is notified as many times as you have added it to the promise. Also there is no functionality to remove listeners. The listeners list is cleared automatically when the promise is resolved. After the promise is resolved with success or failure, adding new listeners causes their immediate notification. The listeners are not added to the list of listeners.
The promise is very simple concept, but it is also very powerful. All control constructs described below are implemented using it. Actually, the utility method on AsyncControl object allow program a lot, without using promises explicitly, however underneath there are a lot of promises created.
Sometimes it is very useful just to wrap result of exception execution into promise, independently of whether expression evaluate successfully or just fails. This allows to avoid polluting the code with multiple try catch statements. As you have probably guessed, there is an utility function just for this. This is called aNow (AsyncControl).
@Test def testNowSuccess() { var visited = false val t = doAsync { val p = aNow { visited = true 51 } assertTrue(visited) p } assertEquals(51, t) assertTrue(visited) } @Test def testNowSuccess() { var visited = false val t = doAsync { val p = aNow { visited = true 51 } assertTrue(visited) p } assertEquals(51, t) assertTrue(visited) } @Test def testNowFailure() { var visited = false var visited2 = false val text = "Fail it!" try { doAsync { val p = aNow { visited = true if(visited) { throw new RuntimeException(text) } 51 } assertTrue(visited) visited2 = true p } fail("Should not be here") } catch { case ex => assertEquals(text, ex.getMessage) } assertTrue(visited2) }
This method is often using when build control constructs. Also there is some automation for creating promise and resolving it it later with resolver. Again it allows catching exceptions in the body, and smashing the resulting promise with it.
@Test def testNowRSuccess() { var visited = false val t = doAsync { val p = aNowR {resolver: Resolver[Int] => visited = true success(resolver, 51) } assertTrue(visited) p } assertEquals(51, t) assertTrue(visited) } @Test def testNowRFailure() { var visited = false var visited2 = false val text = "Fail it!" try { doAsync { val p = aNowR {resolver: Resolver[Int] => visited = true if (visited) { throw new RuntimeException(text) } success(resolver, 51) } assertTrue(visited) visited2 = true p } fail("Should not be here") } catch { case ex => assertEquals(text, ex.getMessage) } assertTrue(visited2) }
Note that type of resolver is specified in the test. In most cases this is not needed, since Scala type inference works well enough. This test is probably the only place in AsyncScala code, where the type is specified explicitly. For the second test, you could see that result returned from success operator is ignored. This is because exception thrown from the body takes precedence over value supplied to resolver.
As opposite to aNow, there is aNever[T] asynchronous operation that never ends. Event infinite loop equivalents in AsyncScala are graceful and do not consume resources.
The one way send operation is actually more fundamental than promise. The promises are build upon it, and resolver returned from promise, does one-way send to the vat, so the promise is resolved in the context of vat, where promise was created. Such resolution allows to avoid a lot of worried about synchronization of listeners. All listeners registered with promise are either exported resolvers, or were registered in the context of the vat where the promise was created. If you are not sure about context, where your listener will be called, use only exported version of it.
The direct way to send to the vat is to use Vat.enqueue() method.
@Test def testDirectEnqueue() { var visited = false val t = doAsync { val p = new Promise[Int] val r = p.resolver Vat.current.enqueue {()=> visited = true success(r, 11) } p } assertEquals(11, t) assertTrue(visited) }
However, there is a shortcut for this, in a form of aSend operation (AsyncControl):
@Test def testSendVat() { var visited = false val t = doAsync { val p = new Promise[Int] val r = p.resolver aSend(Vat.current) { visited = true success(r, 11) } p } assertEquals(11, t) assertTrue(visited) }
And since sending to the current vat is most typical usage of this construct, it could be simplified even further:
@Test def testSendCurrent() { var visited = false val t = doAsync { val p = new Promise[Int] val r = p.resolver aSend { visited = true success(r, 11) } p } assertEquals(11, t) assertTrue(visited) }
Another typical situation is when we want to start some asynchronous operation in other vat, or we want continue the doing something on the current turn and schedule some activity for later time on the current vat. If we are not interested in results, there is no problem. We could just use aSend operator. However in most cases we are interested in result. One of possible ways to achieve it is to use promises and resolvers (like it was done in the previous sample). However, because it is quite common task, there is an operator for this as well ( aLater from AsyncControl).
@Test def testLaterVatSuccess() { var visited = false val t = doAsync { val p = aLater(Vat.current) { visited = true 11 } assertFalse(visited) p } assertEquals(11, t) assertTrue(visited) }
There is a variant for the current vat as well.
@Test def testLaterCurrentSuccess() { var visited = false val t = doAsync { val p = aLater { visited = true 11 } assertFalse(visited) p } assertEquals(11, t) assertTrue(visited) }
Failures are handled gracefully from the action as well.
@Test def testLaterFailure() { var visited = false val text = "Fail it!" try { doAsync { val p = aLater { visited = true if (visited) { throw new RuntimeException(text) } else { 11 } } assertFalse(visited) p } fail("Should not be here") } catch { case ex => assertEquals(text, ex.getMessage) } assertTrue(visited) }
As you have probably guessed, the this method could be implemented used the previous constructs like the following:
def aLater[T](vat: vats.Vat)(f: => Promise[T]): Promise[T] = { aNowR {resolver => aSend(vat) { aNow(f).listen(resolver) } } }
Now we could look back to doAsync method that we have used in this test suite extensively. It also uses some operations defined above.
def doAsync[T](body: => Promise[T]): T = { var outcome: Outcome[T] = null val vat = new SingleThreadVat aSend(vat) { aNow(body).listen {o => outcome = o vat.stop() } } vat.run() assert(outcome != null, "Runner should stop only after the resolution") outcome.force }
As you could see, the method does the following: