OO Programming

When reading the previous chapters you have seen a lot of structured and functional programming, but we have not yet touched how to work with components. There were actually a single two-faced promise-resolver component, with resolver being an asynchronous component. And we have been able to do a lot just by using it. In this chapter we will discuss how to create own components.

Note: The component model described here is also inspired by E programming language. And that was a first asynchronous component model that gave me a good gut feeling.

Functions

We would not leave a functional programming for a while, since the simplest kind of object in Scala is a function. Some functions could be made an asynchronous object as well, by creating an asynchronous proxy for it. The proxy will provide the following guarantees, which are based on guarantees that the vat provides (provided that you will give away only proxy, keeping original closure to yourself and will give it away to other vats):

  • All invocations are done in the context of vat for which proxy was created.
  • There are appropriate memory barriers when sending and dispatching the message.
  • All invocations are done in order they have enqueued to the vat. This will give a partial order guarantees with respect to the sender. If senders from vat A send messages a1 and a2 to the component, and senders from vat B send messages b1 and b2, then a1 will always be received before a2 and b1 will be received before b2. However the these subsequences could interleave in any order (a1, b1, a2, b2), (b1, a1, a2, b2) or in come combination.

There are two kinds of asynchronous functions natively supported:

Oneway
The functions that returns Unit type.
Request Response
The functions that returns promise for some type.

The functions could be converted into a simple asynchronous component using one of methods on AsyncFunctions: class. The methods onewayN converts a N argument function to oneway functions. They do not cares about what that function returns and just discards result. The methods exportN converts a N argument function that return promise to function that return promise as well, however arguments are executed on the target vat.

  def oneway2[A1, A2](f: (A1, A2) => _): (A1, A2) => Unit = oneway2(Vat.current)(f)

  def oneway2[A1, A2](vat: Vat)(f: (A1, A2) => _): (A1, A2) => Unit = {a1, a2=>
    aSend(vat) {
      f(a1,a2)
    }
  }

  def export2[A1, A2, R](f: (A1, A2) => Promise[R]): (A1, A2) => Promise[R] = export2(Vat.current)(f)

  def export2[A1, A2, R](vat: Vat)(f: (A1, A2) => Promise[R]): (A1, A2) => Promise[R] = {a1, a2=>
    aLater(vat) {
      f(a1,a2)
    }
  }

The actual implementation in AsyncFunctions also does some trickery with tuples to reduce an amount of closures created and tries to mark the resulting function with Asynchronous marker interface. But basic functionality is the same as above.

It is also possible to wrap function that returns non-promise value to function that returns promise with wrapN operator. The operator works as the following:

  def wrap2[A1, A2, R](f: (A1, A2) => R): (A1, A2) => Promise[R] = wrap2(Vat.current)(f)

  def wrap2[A1, A2, R](vat: Vat)(f: (A1, A2) => R): (A1, A2) => Promise[R] = {a1, a2=>
    aLaterWrap(vat) {
      f(a1,a2)
    }
  }

The operator aLaterWrap (AsyncControl) always wraps the result into the promise.

With these operators it is possible to get components that are safe to use from any vat. And in case of oneway functions even from any thread.

      @Test
      def testExport() {
        doAsync {
          expectEquals(33) {
            val s = Vat.current
            val f = export2{ (a : Int, b : Int) =>
              assertSame(s, Vat.current)
              aLater(a * b)
            }
            ActorVat.spawn {
              assertNotSame(s, Vat.current)
              f(11, 3)
            }
          }
        }
      }

As you could see, even when invoked from other vat, the function is still executed on the vat were it was created.

You could also create these functions on other vats, using variant that takes vat as the first argument. In that case you should not share any mutable state with outer context. Before receiving the fist event with your function, the appropriate memory barriers will happen. So immutable state will be safe to use.

Objects

Functions are kind of one method objects, you have probably already guessed how more interfaces could be implemented. Now we get into OO-programming on example of Queue class from the library.

Interfaces

First you need to define an object interface. The interface should extend Asynchronous marker interface. But framework will work with your interface even without it.

The interface should define the methods that either return unit type or return promise. You still could define methods of other kind, but when you would create an asynchronous proxy for this interface, an exception will be thrown.

For the queue, interface would be very simple:

/**
 * The queue
 */
trait AQueue[T] extends Asynchronous {
  /**
   * @return the item from the queue
   */
  def take: Promise[T]

  /**
   * @return when value has been put in the queue, and queue is ready for next value
   */
  def put(value: T): Promise[Unit]
}
  

We did not need one way methods, so they are not defined here. Theoretically, put might have been a oneway method, but promise is might be needed for other implementation of queue to limit incoming data. Oneway method are actually a quite rare occurrence. They are usually happen when you need to call your components from non-Vat context or for the case when nothing could be done about success or failure anyway. The resolver is example of such oneway function.

It is suggested to start asynchronous trait names with prefix "A" to stress the fact that this asynchronous version of trait.

Implementation Class

The implementers asynchronous trait should just implement that trait with class, like the following.

class Queue[T] extends AQueue[T] {
        ....
}

Then we should define a place where enqueued elements are put to:

protected val values = new ListBuffer[T]

After that, the values could be got from queue and put to queue as the following:

  def take = values.remove(0)
  def put(t:T) = {values += t; ()}

But that variant would work only if there are always values to take from. In synchronous non-blocking interfaces, there would have been some special value indicating that we currently do not have any value. Or we could have done some fun with futures, to check whether value is available from time to time. But with promises we would just nee to promise, that we would deliver the value later. For this purpose we would create a list of resolvers, to keep track of promises that we have returned.

protected val waiters = new ListBuffer[Resolver[T]]

Then take and put methods could be reformulated as the following:

  def take: Promise[T] = {
    if (values.isEmpty) {
      val p = new Promise[T]
      waiters += p.resolver
      p
    } else {
      values.remove(0)
    }
  }

Note that take method returns a value if it is available, but if it is not available, it saves the resolver in list of waiters and returns unresolved promise.

  def put(value: T): Promise[Unit] = {
    if (waiters.isEmpty) {
      values += value
      ()
    } else {
      success(waiters.remove(0), value)
    }
  }

The put method does symmetric operation, if there are some resolvers for not resolved promises, just put resolve promise that we have previously returned. Otherwise, just save value in the list of values.

  def put(value: T): Promise[Unit] = {
    if (waiters.isEmpty) {
      values += value
      ()
    } else {
      success(waiters.remove(0), value)
    }
  }

Lets now consider the case, when we known that no more values will be coming to the queue because of the failure in some other subsystem. And we want to notify current and future waiters.

  protected var fault: Option[Throwable] = None

  private def checkValid() {
    for (ex <- fault) {
      throw ex
    }
  }

  def abort(ex: Throwable) {
    fault = Some(ex)
    for (w <- waiters) {
      failure(w, ex)
    }
    waiters.clear()
  }

Calling the abort method will notify all pending waiters about failure, and future requests will be aborted by using check valid for take and put methods.

  def take: Promise[T] = {
    checkValid()
    ....
  }

  def put(value: T): Promise[Unit] = {
    checkValid()
    ....
  }

Now you could also look at ChainQueue in sample directory to how it could be implemented using a single list that have approximately the same semantics, but more compact representation.

We now have a quite useful component, that could be used safely in context of the single vat. But what if you need to use the same component from different vats?

Creating Proxy

To use component from other context you need to create an proxy of it. As you have seen with functions, coding such proxy is quite easy.

object Queue {
  def make[T]: AQueue[T] = make[T](Vat.current)

  def make[T](vat: Vat): AQueue[T] = export(vat, new Queue[T])

  def export[T](vat: Vat, queue: AQueue[T]): AQueue[T] = {
    new AQueue[T] {
      def put(value: T) = aLater(queue.put(value))

      def take = aLater(queue.take)
    }
  }
}

Note that it is suggested to put method that create already exported instances into companion objects ( Queue in this case).

But in case when you are not feeling like doing some monkey work, you could use slightly slower reflection based proxy.

  def make[T]: AQueue[T] = ObjectUtil.export(classOf[AQueue[T]], new Queue)
  def make[T](vat: Vat): AQueue[T] = ObjectUtil.export(classOf[AQueue[T]], vat, new Queue)

In future, it is planned to use ASM-based proxy instead while keeping API the same. So unless you need max performance right now, use proxies provided by framework. On micro-benchmarks that stress method invocation, reflection-based proxy showed only about 15% worse result. When overheads in other parts of the framework will be fixed, such optimization will make much more sense than now.

In general, component, proxy, and vat are connected to each other like the following:

Components and Vats

As you could see the proxy knows of implementation object and vat, where it enqueues events. However, vat generally does not know anything about objects that lives in it. It references only events enqueued to it, and it only references them until these events are dispatched. So when proxies of object are unreferenced, it might be garbage collected.

Mutual Exclusion

Often, the asynchronous operation take more than one turn, and is undesirable to start a next operation before current one finishes. In the Queue sample, whe have ordered operations explicitly, but there is a utility class that could take care of such bookkeeping. It is RequestQueue.

This class is an asynchronous counterpart of object monitor in Java. It provides a way to order sequences of events and also analog of wait/notify in a form ofawake/ awaken methods. The class is safe to use only in context of the single vat.

In this section we will use this class to implement anRandevuQueue. This queue is different from normal queue in way that put methods does not returns until take has not executed. So the value is kind of handed over to other party. Let's consider take functionality first:

class RandevuQueue[T] extends AQueue[T] {
  private val takes = new RequestQueue
  private var resolver: Option[Resolver[T]] = None

  ....

  def take = takes.run {
    assert(resolver == None)
    aNowR {r =>
      resolver = Some(r); puts.awake()
    }
  }
}

The take could not get value on its own. The other party should hand this value over. So we need just create a promise and to give other party a resolver saving it to resolver variable. Since this variable could hold only one value at time, we protect access to it by takes.run{...} invocation. Also, if someone already waiting on put queue, we notify it, that state it might be interested it has changed. It is not turn of other party to resolve the resolver, so the promise returned by aNowR will resolve. The request queue will notice that promise is resolved, and proceed with next request, and value of the entire block will be a value of inner block. Now lets see what puts do.

  private val puts = new RequestQueue

  def put(value: T) = puts.run {
    aSeqLoop {
      resolver match {
        case Some(r) => {
          success(r, value);
          resolver = None;
          false
        }
        case None => aSeqI(puts.awaken, true)
      }
    }
  }

First we order put requests using corresponding request queue puts.run {...}. Since we do not want later put request to resolve value before more early than one that arrived first. Then we start a loop that waits until resolver appears. On each iteration we check if resolver already appeared. If yes, we notify it and exit the loop. If not, we wait until notified, and continue the loop. This quite closely corresponds to SynchronousQueue in Java.

Lets now see how it would work:

  @Test
  def randevuTest() {
    doAsync {
      val queue = RandevuQueue.make[Int]
      expectEquals(List(-1, 1, -2, 2, -3, 3, -4, 4, -5, 5)) {
        val list = new ListBuffer[Int]
        aSeq {
          aAll {
            aAllForUnit(1 to 5) {i =>
              aSeqI(queue.put(i), list += i)
            }
          } andLast {
            aAllForUnit(1 to 5) {i =>
              aWhenLast(queue.take) {q =>
                list += -q
              }
            }
          }
        }.thenLastI {
          list.toList
        }
      }
    }
  }

As you could see in the test, the take and put requests are interleaved. And take requests resolve a bit faster, than put requests (this actually not very good test in this respect, since this implementation dependent). You could also play out with request queue yourself, since it is a library class. The randevu queue is almost one synchronization primitives itself, so you might want to use to order concurrent processes.

Executing Blocking Operations

The most of Java API is blocking or have some chance to block, particularly if some IO is involved. The things will change for bit better with Java 7, but there is still a lot of legacy API that blocks. If components have to call a blocking operation, there still a number of ways to work around it.

  1. It is possible to spawn an operation in a new vat. The method ExecutorVat.spawn is one of possible ways to do it. It is not recommended to use ActorVat for that purpose, since they are executed on fixed thread pool, and you could exhaust it with blocking operations.
  2. The other possible way is create vat (for example ExecutorVat.daemon and use it for the blocking operations). This is a recommended way if it is needed to run blocking operation from time to time. Since ExecutorVat.daemon share a common thread pool, you would also avoid creating unneeded threads.
  3. And you could just manually launch operation on some other thread and to use one way operations (for example resolver for some promise) to notify asynchronous operation back in the vat.
  4. For operations with high cpu usage it recommended to use either TaskTree or actors that will notify asynchronous process back using resolver.

When communicating with components written in AsyncScala, you should need block to wait for other AsyncScala function or component to complete.

Invoking Components from non-Vat context

Sometimes you need to call asynchronous components from legacy code. In that case you have several choices.

  1. You could use one-way functions to invoke the asynchronous components. For example, resolver is such one-way functions.
  2. You could create a temporary vat using doAsync method on AsyncControl class. It waits for its body to complete, so it kinda implements blocking interface to non-blocking components. Note that after doAsync exits, the events sent to vat will be no more dispatched. So do not create components in that vat that have greater lifetime than doAsync. The method doAsync is a usual way to get vat on main thread, unless you use NIO.
  3. You could create a longer living vat using ActorVat .start or ExecutorVat.daemon and communicate with it.
  4. If you need to invoke AsyncScala components from event loop, you might want to implement a vat over your event loop. For example, there is AWTVat implemented over AWT EventQueue, you could use it as an example.