aPar and aParFor

Note: note that the control constructs described in this section will be likely redesigned in the future.

The problems described in previous section could be avoided with some careful planning. There just should not be be more active tasks than CPUs or thread in dedicated thread pool ( ATaskTree created by TaskTree object). So if we would look at set of tasks as at tree, there should be only fixed amount of tasks from root to leafs active. And there is a class that just does it. The class itself a close match to fork join. However it is not yet as optimized as fork join.

Each task and sub-task in task tree have own context (Task). And this context is used to launch parallel activities like aPar and aParFor operators. Here methods are called fork and fork for. In example, first task started, then it creates two subtasks using fork methods. Then one of the subtasks using fork method again to create new subtasks.

  @Test
  def testSimple() {
    val pool = TaskTree.makeActors(2)
    doAsync {
      expectEquals((1, (2, 3))) {
        pool.start {t =>
          t.fork {st =>
            1
          } andLast {st =>
            st.fork {sst =>
              2
            }.andLast {sst =>
              3
            }
          }
        }
      }
    }
  }

The method forkFor is almost identical to aParFor counterpart.

  @Test
  def testFor() {
    val pool = TaskTree.makeActors(2)
    doAsync {
      expectEquals((1 to 10).toList) {
        pool.start {t =>
          t.forkFor(1 to 10) {(st, i) =>
            i
          }.toList
        }
      }
    }
  }

The tasks in the task tree are supposed to be CPU-bound. So if you would start wait for IO, the other task will not get activated, since task tree thinks that you would resume your work very shortly. Because of it, it would be better to prepare all needed data before starting computation. However there is a way to suspend current task, and to allow task tree to schedule other subtasks while this task waits for external resources (for example, reads data from network).

  @Test
  def testExternal() {
    val pool = TaskTree.makeActors(2)
    doAsync {
      expectEquals((1, (2, 3))) {
        val v = Vat.current
        pool.start {t =>
          t.fork {st =>
            1
          } andLast {st =>
            st.fork {sst =>
              sst.external {
                aLater(v) {
                  2
                }
              }
            }.andLast {sst =>
              3
            }
          }
        }
      }
    }
  }

Parallel Merge Sort with TaskTree

You could see how to use TaskTree in merge sort sample in the distribution. If you run it, you would notice, that it makes a better use of CPU, and adding processors makes it faster, rather than slower as in the first version.