When reading the previous chapters you have seen a lot of structured and functional programming, but we have not yet touched how to work with components. There were actually a single two-faced promise-resolver component, with resolver being an asynchronous component. And we have been able to do a lot just by using it. In this chapter we will discuss how to create own components.
Note: The component model described here is also inspired by E programming language. And that was a first asynchronous component model that gave me a good gut feeling.
We would not leave a functional programming for a while, since the simplest kind of object in Scala is a function. Some functions could be made an asynchronous object as well, by creating an asynchronous proxy for it. The proxy will provide the following guarantees, which are based on guarantees that the vat provides (provided that you will give away only proxy, keeping original closure to yourself and will give it away to other vats):
There are two kinds of asynchronous functions natively supported:
The functions could be converted into a simple asynchronous component using one of methods on AsyncFunctions: class. The methods onewayN converts a N argument function to oneway functions. They do not cares about what that function returns and just discards result. The methods exportN converts a N argument function that return promise to function that return promise as well, however arguments are executed on the target vat.
def oneway2[A1, A2](f: (A1, A2) => _): (A1, A2) => Unit = oneway2(Vat.current)(f) def oneway2[A1, A2](vat: Vat)(f: (A1, A2) => _): (A1, A2) => Unit = {a1, a2=> aSend(vat) { f(a1,a2) } } def export2[A1, A2, R](f: (A1, A2) => Promise[R]): (A1, A2) => Promise[R] = export2(Vat.current)(f) def export2[A1, A2, R](vat: Vat)(f: (A1, A2) => Promise[R]): (A1, A2) => Promise[R] = {a1, a2=> aLater(vat) { f(a1,a2) } }
The actual implementation in AsyncFunctions also does some trickery with tuples to reduce an amount of closures created and tries to mark the resulting function with Asynchronous marker interface. But basic functionality is the same as above.
It is also possible to wrap function that returns non-promise value to function that returns promise with wrapN operator. The operator works as the following:
def wrap2[A1, A2, R](f: (A1, A2) => R): (A1, A2) => Promise[R] = wrap2(Vat.current)(f) def wrap2[A1, A2, R](vat: Vat)(f: (A1, A2) => R): (A1, A2) => Promise[R] = {a1, a2=> aLaterWrap(vat) { f(a1,a2) } }
The operator aLaterWrap (AsyncControl) always wraps the result into the promise.
With these operators it is possible to get components that are safe to use from any vat. And in case of oneway functions even from any thread.
@Test def testExport() { doAsync { expectEquals(33) { val s = Vat.current val f = export2{ (a : Int, b : Int) => assertSame(s, Vat.current) aLater(a * b) } ActorVat.spawn { assertNotSame(s, Vat.current) f(11, 3) } } } }
As you could see, even when invoked from other vat, the function is still executed on the vat were it was created.
You could also create these functions on other vats, using variant that takes vat as the first argument. In that case you should not share any mutable state with outer context. Before receiving the fist event with your function, the appropriate memory barriers will happen. So immutable state will be safe to use.
Functions are kind of one method objects, you have probably already guessed how more interfaces could be implemented. Now we get into OO-programming on example of Queue class from the library.
First you need to define an object interface. The interface should extend Asynchronous marker interface. But framework will work with your interface even without it.
The interface should define the methods that either return unit type or return promise. You still could define methods of other kind, but when you would create an asynchronous proxy for this interface, an exception will be thrown.
For the queue, interface would be very simple:
/** * The queue */ trait AQueue[T] extends Asynchronous { /** * @return the item from the queue */ def take: Promise[T] /** * @return when value has been put in the queue, and queue is ready for next value */ def put(value: T): Promise[Unit] }
We did not need one way methods, so they are not defined here. Theoretically, put might have been a oneway method, but promise is might be needed for other implementation of queue to limit incoming data. Oneway method are actually a quite rare occurrence. They are usually happen when you need to call your components from non-Vat context or for the case when nothing could be done about success or failure anyway. The resolver is example of such oneway function.
It is suggested to start asynchronous trait names with prefix "A" to stress the fact that this asynchronous version of trait.
The implementers asynchronous trait should just implement that trait with class, like the following.
class Queue[T] extends AQueue[T] { .... }
Then we should define a place where enqueued elements are put to:
protected val values = new ListBuffer[T]
After that, the values could be got from queue and put to queue as the following:
def take = values.remove(0) def put(t:T) = {values += t; ()}
But that variant would work only if there are always values to take from. In synchronous non-blocking interfaces, there would have been some special value indicating that we currently do not have any value. Or we could have done some fun with futures, to check whether value is available from time to time. But with promises we would just nee to promise, that we would deliver the value later. For this purpose we would create a list of resolvers, to keep track of promises that we have returned.
protected val waiters = new ListBuffer[Resolver[T]]
Then take and put methods could be reformulated as the following:
def take: Promise[T] = { if (values.isEmpty) { val p = new Promise[T] waiters += p.resolver p } else { values.remove(0) } }
Note that take method returns a value if it is available, but if it is not available, it saves the resolver in list of waiters and returns unresolved promise.
def put(value: T): Promise[Unit] = { if (waiters.isEmpty) { values += value () } else { success(waiters.remove(0), value) } }
The put method does symmetric operation, if there are some resolvers for not resolved promises, just put resolve promise that we have previously returned. Otherwise, just save value in the list of values.
def put(value: T): Promise[Unit] = { if (waiters.isEmpty) { values += value () } else { success(waiters.remove(0), value) } }
Lets now consider the case, when we known that no more values will be coming to the queue because of the failure in some other subsystem. And we want to notify current and future waiters.
protected var fault: Option[Throwable] = None private def checkValid() { for (ex <- fault) { throw ex } } def abort(ex: Throwable) { fault = Some(ex) for (w <- waiters) { failure(w, ex) } waiters.clear() }
Calling the abort method will notify all pending waiters about failure, and future requests will be aborted by using check valid for take and put methods.
def take: Promise[T] = { checkValid() .... } def put(value: T): Promise[Unit] = { checkValid() .... }
Now you could also look at ChainQueue in sample directory to how it could be implemented using a single list that have approximately the same semantics, but more compact representation.
We now have a quite useful component, that could be used safely in context of the single vat. But what if you need to use the same component from different vats?
To use component from other context you need to create an proxy of it. As you have seen with functions, coding such proxy is quite easy.
object Queue { def make[T]: AQueue[T] = make[T](Vat.current) def make[T](vat: Vat): AQueue[T] = export(vat, new Queue[T]) def export[T](vat: Vat, queue: AQueue[T]): AQueue[T] = { new AQueue[T] { def put(value: T) = aLater(queue.put(value)) def take = aLater(queue.take) } } }
Note that it is suggested to put method that create already exported instances into companion objects ( Queue in this case).
But in case when you are not feeling like doing some monkey work, you could use slightly slower reflection based proxy.
def make[T]: AQueue[T] = ObjectUtil.export(classOf[AQueue[T]], new Queue) def make[T](vat: Vat): AQueue[T] = ObjectUtil.export(classOf[AQueue[T]], vat, new Queue)
In future, it is planned to use ASM-based proxy instead while keeping API the same. So unless you need max performance right now, use proxies provided by framework. On micro-benchmarks that stress method invocation, reflection-based proxy showed only about 15% worse result. When overheads in other parts of the framework will be fixed, such optimization will make much more sense than now.
In general, component, proxy, and vat are connected to each other like the following:
As you could see the proxy knows of implementation object and vat, where it enqueues events. However, vat generally does not know anything about objects that lives in it. It references only events enqueued to it, and it only references them until these events are dispatched. So when proxies of object are unreferenced, it might be garbage collected.
Often, the asynchronous operation take more than one turn, and is undesirable to start a next operation before current one finishes. In the Queue sample, whe have ordered operations explicitly, but there is a utility class that could take care of such bookkeeping. It is RequestQueue.
This class is an asynchronous counterpart of object monitor in Java. It provides a way to order sequences of events and also analog of wait/notify in a form ofawake/ awaken methods. The class is safe to use only in context of the single vat.
In this section we will use this class to implement anRandevuQueue. This queue is different from normal queue in way that put methods does not returns until take has not executed. So the value is kind of handed over to other party. Let's consider take functionality first:
class RandevuQueue[T] extends AQueue[T] { private val takes = new RequestQueue private var resolver: Option[Resolver[T]] = None .... def take = takes.run { assert(resolver == None) aNowR {r => resolver = Some(r); puts.awake() } } }
The take could not get value on its own. The other party should hand this value over. So we need just create a promise and to give other party a resolver saving it to resolver variable. Since this variable could hold only one value at time, we protect access to it by takes.run{...} invocation. Also, if someone already waiting on put queue, we notify it, that state it might be interested it has changed. It is not turn of other party to resolve the resolver, so the promise returned by aNowR will resolve. The request queue will notice that promise is resolved, and proceed with next request, and value of the entire block will be a value of inner block. Now lets see what puts do.
private val puts = new RequestQueue def put(value: T) = puts.run { aSeqLoop { resolver match { case Some(r) => { success(r, value); resolver = None; false } case None => aSeqI(puts.awaken, true) } } }
First we order put requests using corresponding request queue puts.run {...}. Since we do not want later put request to resolve value before more early than one that arrived first. Then we start a loop that waits until resolver appears. On each iteration we check if resolver already appeared. If yes, we notify it and exit the loop. If not, we wait until notified, and continue the loop. This quite closely corresponds to SynchronousQueue in Java.
Lets now see how it would work:
@Test def randevuTest() { doAsync { val queue = RandevuQueue.make[Int] expectEquals(List(-1, 1, -2, 2, -3, 3, -4, 4, -5, 5)) { val list = new ListBuffer[Int] aSeq { aAll { aAllForUnit(1 to 5) {i => aSeqI(queue.put(i), list += i) } } andLast { aAllForUnit(1 to 5) {i => aWhenLast(queue.take) {q => list += -q } } } }.thenLastI { list.toList } } } }
As you could see in the test, the take and put requests are interleaved. And take requests resolve a bit faster, than put requests (this actually not very good test in this respect, since this implementation dependent). You could also play out with request queue yourself, since it is a library class. The randevu queue is almost one synchronization primitives itself, so you might want to use to order concurrent processes.
The most of Java API is blocking or have some chance to block, particularly if some IO is involved. The things will change for bit better with Java 7, but there is still a lot of legacy API that blocks. If components have to call a blocking operation, there still a number of ways to work around it.
When communicating with components written in AsyncScala, you should need block to wait for other AsyncScala function or component to complete.
Sometimes you need to call asynchronous components from legacy code. In that case you have several choices.