NIO Library

Note: note that the functionality described in this section will be likely redesigned in the future.

The asynchronous programming shines when things come to IO. IO bound programs are mostly waiting for external events to happen. Blocking IO API assumes that a number of parallel threads with handle this load and each thread consumes some resources. Asynchronous operations are more costly, but with time, the cost of wasting resources on blocking operations overweight cost of asynchronous operations. This is one of secrets of scalability of asynchronous server software. The AsyncScala provides nice and easy to use library to work with NIO Sockets. The library also supports working with other kinds of streams in standard Java library using approaches described in components chapter.

The key interfaces of NIO library are the following:

  • ACloseable - it is actually a general interface that allows disposing resources. It was added specifically for NIO, but now is used for other pieces of code as well.
  • AInput - a generic input that allows reading data using NIO buffers.
  • AOutput - a generic output that allows writing data using NIO buffers.

These components represent generic input and output stream, and a generic channel. The generic parameter of these stream should be a subclass of java.nio.Buffer. There is a conversion of the stream to rich input and output streams that support additional API for the streams and some utility functions in case of input and output character streams.

Blocking Streams

Most of Java API provides blocking inputs and output streams. There are adapters for them that allows to use reading and writing on daemon threads. Note that Executor-based adapters actually use two vats. One vat from read/write operations and other vat for close operation. This is because, the close operation needs to be supported when read or write are in the progress. Otherwise, there is a risk of blocking, when some other asynchronous operation failed, and stream is attempted to be closed at the end of aUsing construct, while it is waiting for some input. The sample below demonstrate creation of streams over stdin and stdout. Note adapter is based on daemon threads is used.

object EchoSampleDaemon {
  def main(args: Array[String]) {
    doAsync {
      aUsing(Adapters.daemonInput(System.in), Adapters.daemonOutput(System.out)) {(in, out) =>
        in.copyTo(out, ByteBuffer.allocate(4096), true)
      }
    }
  }
}

The copyTo operation is a generic operation over the buffer type and stream, that is provided as part of NIO library in RichInput. It is quite straightforward, and it is not much different from the variant, that someone would have written in synchronous case (input is a current stream).

  def copyTo(out: AOutput[B], buffer: B, autoFlush: Boolean): Promise[Long] = {
    var length = 0L
    aSeq {
      var flushed = aUnit
      aSeqLoop {
        aWhenLast(input.read(buffer)) {n =>
          if (n == -1) {
            false
          } else {
            length += n
            buffer.flip
            aWhenLastI(aSeqI(flushed, out.write(buffer))) {
              util.compact(buffer)
              if (autoFlush) {
                flushed = out.flush
              }
              true
            }
          }
        }
      }
    }.thenLastI {
      length
    }
  }

Only thing that worth mention is autoFlush option. The flush is not being waited before trying next read, and flush could actually happen a the same time as read happens. However, the next write operation will start only after flush finished. And if we got any exceptions from flush, the write will not be attempted. Such implicit parallelism is easy to achieve with asynchronous programming, but with synchronous API it would have required an explicit thread management, and non-trivial coordination.

Utility Streams: Nul, Pipes, Digesting

Those, who are in urgent need for /dev/nul emulator, would be glad to find NulInput, NulOutput, and NulChannel. These classes are actually used in some tests.

Pipe is a bit more useful class and there is a generic abstract pipe implementation over any buffer type. To get a type specific implementation use companion object.

And then, there are components that greatly simplified writing tests from some IO components. They are DigestingInput and DigestingOutput. They like their JDK counterparts compute hash of data that is coming through it. So it is possible to check that you got what you have written. It makes it possible to write randomized tests like test that was used to test pipe (this test actually uncovered some bugs in it). RichByteInput and RichByteOutput allow creating digest wrapper utility object that provide different digesting options for the stream. So generally, you do not need to use these objects directly.

  @Test
  def testPipe() {
    doAsync {
      val pipe = BufferedPipe.bytePipe(4)
      val inputHash = new Promise[Array[Byte]]
      val outputHash = new Promise[Array[Byte]]
      val N = 3
      val rnd = new Random()
      aSeq {
        aAll {
          aUsing(aWhenLast(pipe.input)(_.digest(inputHash.resolver).md5)) {in =>
            in.discard(ByteBuffer.allocate(6))
          }
        }.andLast {
          aUsing(aWhenLast(pipe.output)(_.digest(outputHash.resolver).md5)) {out =>
            val b = ByteBuffer.allocate(5)
            aSeqForUnit(0 to N - 1) {i =>
              rnd.nextBytes(b.array());
              b.position(0).limit(b.capacity())
              out.write(b)
            }
          }
        }
      }.thenI {
        aAll(inputHash, outputHash)
      }.thenLast {
        case (ih, oh) => Assert.assertArrayEquals(ih, oh)
      }
    }
  }

The non-matching values in the test was used just to detect more errors. At the end of the test it is just needed to compare hashes, rather than actual stream values.

In future versions, it is planned to add BufferedInput and BufferedOutput support. But there is no immediate pressure to do it. If you want to get buffer more data, just use larger buffers to the read and write methods. The asynchronous IO is better to be done in bulk portions anyway. If you are interested in some other utility class, post a request to bug tracker.

Text IO

Text IO is also unblocking in the AsyncScala, and it relies on Java CharsetEncoder and CharsetDecoder classes. The classes that provide this functionality can be created using DecoderInput and EncoderOutput objects. The sample below demonstrates how to decode and encode input and output streams using default charset. There are more overloads for encode and decode method that allow to tune encoder and decoders and to use any charset you want. But unless you want an precise control over encoding and decoding process it is simpler to use overloads of asText method on RichByteInput and RichByteOutput rather than constructing these streams directly using DecoderInput and EncoderOutput objects.

object EchoEncodeDecode {
  def main(args: Array[String]) {
    doAsync {
      aUsing(Adapters.daemonOutput(System.out).asText) {out =>
        Adapters.daemonInput(System.in).asText.lines.seqFold(1) {(i, l) =>
          aSeqI(out.println("" + i + ": " + l), out.flush, i + 1)
        }
      }
    }
  }
}

Note that several rich objects over IO objects are used. Firstly, on binary input converted to text input and a line stream is created with lines method. An than this stream is iterated with seqFold method, that folds the stream visiting one element at time. This folding is trivial, it just count lines and as side effect writes numbered input lines to standard output. The lines itself are also written using a rich object over output stream. The original interface only supports writing NIO CharBuffers.

Also, encoder output buffers data passed to it, so flush is need to force data to underlying stream.

NIO Sockets over Selectors

The classes specified above are nice to have, but NIO support was create just to support usable wrapper over non-blocking NIO streams. And results are almost as simple to use as normal blocking streams.

The sockets are created using ASocketFactory interface. Currently only single implementation of this interface is provided, but other implementations are certainly possible, for example ones that are based on NIO.2 asynchronous channels or even over synchronous sockets.

The one of the ways to create a socket factory right now SelectorVat that has the method run that starts the special vat on the current thread and passes factory to in. The special vat is needed to host sockets because the non-blocking sockets should be associated with selectors. This vat is supposed to be used mostly for sockets, but it can be used by other classes as well.

  SelectorVat.run {factory =>
    // some code that uses socket factory
  }
}

It is also possible to use SelectorVat.startDaemon method that starts SelectorVat on some background thread and returns a a created socket factory.

  doAsync {
    aWhenLast(SelectorVat.startDaemon) {factory =>
    // some code that uses socket factory
    }
  }
}

Writing a Server

After we got hold of the socket factory, we could start server. As the first step, AServerSocket is created. The server socket is created in unbound way by socket factory, it should be bound before use. And traditional accept/serve loop is started.

class EchoServer(factory: ASocketFactory, port: Int, bufferSize: Int, backlog: Int, twoBuffers: Boolean) {
  def runServer: Promise[Unit] = {
    aUsing(factory.makeServerSocket) {server =>
      aSeq {
        server.bind(new InetSocketAddress(port), backlog)
      }.thenLastI {
        aSeqLoop {
          aWhenLast(server.accept) {c =>
            serveClient(c)
            aSuccess(true)
          }
        }
      }
    }
  }

  def serveClient(c: ASocket) {
     ....
  }

}  }

Note that we do not need to wait for the client, until we start accepting the next client. Also we do not need to react to the client failures, they could be just logged. So client is started by method that returns unit type. Such method is somewhat similar to fork() in UNIX programming.

  var counter = 0
  var active = 0
  val otherVat = ExecutorVat.daemon

  def serveClient(c: ASocket) {
    val start = System.currentTimeMillis
    val startActive = active
    active += 1
    val n = counter
    counter += 1
    aSend(Vat.current) {
      aSeq {
        aUsing(c.input, c.output) {(in, out) =>
          if (twoBuffers) {
            in.copyTo2(out, ByteBuffer.allocate(bufferSize), ByteBuffer.allocate(bufferSize))
          } else {
            in.copyTo(out, ByteBuffer.allocate(bufferSize))
          }
        }
      }.then {l =>
        val end = System.currentTimeMillis
        val text = "SUCCESS\t" + n + "\t" + (end - start) + "\t" + startActive + "\t" + (active - 1) + "\t" + l
        aLater(otherVat) {
          println(text)
          aUnit
        }
      }.failed {
        case ex =>
          ex.printStackTrace()
          val end = System.currentTimeMillis
          val text = "FAILURE\t" + n + "\t" + (end - start) + "\t" + "\t" + startActive + "\t" + (active - 1) + ex.toString
          aLater(otherVat) {
            println(text)
            aUnit
          }
      }.finallyDo {
        active -= 1
        c.close
      }
    }
  }

The method serveClient does not contains anything special. It just sends an event to the current vat, and than copies input stream to output stream provided by socket. The biggest part of this method is gathering and printing statistical information. Note, because blocking print operation is used, it is done on other vat.

Writing a Client

Now, lets start client that would spam server with connections. We have seen almost all pieces for the client here. ASocket is byte channel with two additional methods one sets options and other connects unconnected socket to some location. And we just create sockets in parallel. But if all sockets will attempt to connect at the same time, there will just a lot of failures, sine OS is able to handle only a limited number of connect requests. So the connect operation is protected by asynchronous version of Semaphore.

  def main(args: Array[String]) {
    val PORT = 22222
    val BUFFER_SIZE = 4096
    val HOST = "localhost"
    val LENGTH = 1024L * 101 + 11
    val CONNECTS = 40
    val CONNECTIONS = 10000
    val random = new SecureRandom
    val address = new InetSocketAddress(HOST, PORT)
    var counter = 0
    val otherVat = BatchedActorVat.start
    SelectorVat.run {f =>
      val connects = new Semaphore(CONNECTS)
      val appStart = System.currentTimeMillis
      val p = aAllForUnit(0 to CONNECTIONS - 1) {n =>
        aWhenLastI(connects.acquire) {
          aUsing(f.makeSocket) {s =>
            val n = counter
            counter += 1
            var start = 0L
            var connect = 0L
            aSeq {
              start = System.currentTimeMillis
              val p = s.connect(address)
              p.listen {_ =>
                connect = System.currentTimeMillis
                connects.release()
              }
              p
            }.thenI {
              val digestInput = new Promise[Array[Byte]]
              val digestOutput = new Promise[Array[Byte]]
              aAll {
                aUsing(aWhenLast(s.output)(_.digest(digestOutput.resolver).sha1)) {out =>
                  generateRandom(random, LENGTH, out, ByteBuffer.allocate(BUFFER_SIZE))
                }
              }.and {
                aUsing(aWhenLast(s.input)(_.digest(digestInput.resolver).sha1)) {in =>
                  in.discard(ByteBuffer.allocate(BUFFER_SIZE))
                }
              }.and {
                digestOutput
              }.andLast {
                digestInput
              }
            }.then {r =>
              val (ol, il, oh, ih) = r
              val end = System.currentTimeMillis
              val text = "SUCCESS\t" + n + "\t" + (start - appStart) + "\t" + (end - appStart) + "\t" + (connect - start) + "\t" + (end - start) + "\t" + (ol == il && java.util.Arrays.equals(oh, ih)) + "\t" + ol
              aLater(otherVat) {
                println(text)
                aUnit
              }
            }.failedLast {
              case ex =>
                val end = System.currentTimeMillis
                val text = "FAILURE\t" + n + "\t" + (start - appStart) + "\t" + (end - appStart) + "\t" + (connect - start) + "\t" + (end - start) + "\t" + ex.toString
                aLater(otherVat) {
                  println(text)
                  ex.printStackTrace()
                  aUnit
                }
            }
          }
        }
      }
      p.listen(_ => otherVat.stop())
      p
    }
  }

The client is using randomized data for testing and we are checking hashes of input and outputs here, so if there is actually a problem, it will be detected.

  def generateRandom(random: Random, length: Long, out: AByteOutput, buffer: ByteBuffer): Promise[Long] = {
    assert(buffer.hasArray)
    var generated = 0L
    aSeq {
      aSeqLoop {
        buffer.clear()
        random.nextBytes(buffer.array)
        buffer.limit((length - generated).intValue.min(buffer.capacity))
        generated += buffer.remaining
        aSeqI(out.write(buffer), generated < length)
      }
    }.thenLastI {
      generated
    }
  }