OCaml-Java: overview of the concurrent library


This page contains the information about the concurrent library that ships with the alpha version of OCaml-Java 2.0.

Warning! by default, OCaml-Java favors compatibility with the original OCaml implementation, meaning that it is based on a global runtime lock. In order to leverage the power of the concurrent library, it is necessary to disable the runtime lock by linking the program with the -runtime-parameter runtime-lock=off command-line option.

Abstraction levels

The concurrent library is a pack of several modules into the Concurrent module. These modules fall into nine categories of raising abstraction levels:

  • basic thread manipulation (Thread, ThreadGroup, and ThreadLocal modules);
  • locks (Lock, ReadWriteLock, and Condition modules);
  • synchronization (Semaphore, CountDownLatch, CyclicBarrier, Exchanger, and Phaser modules);
  • atomic containers (see below);
  • futures (Future, ScheduledFuture, ThreadPoolExecutor, ScheduledThreadPoolExecutor, and ExecutorCompletionService modules);
  • fork/join computations (ForkJoinPool, and ForkJoin modules);
  • map/reduce computations (MapReduce module);
  • parallel operations over arrays (ParallelArray module);
  • minimalistic software transactional memory (STM module).

Besides these modules, TimeUnit defines the various time units and a conversion function.

The doc directory of the binary distribution contains the ocamldoc-generated documentation for all modules. Most modules from the first five categories above are the counterparts of Java classes with the same name in the package java.util.concurrent (and its sub-packages). It is thus possible to get additional information from the JDK documentation.

Basics

Threads created through the Thread module from the concurrent library are akin to those from the Thread module from the original OCaml distribution (in either systhread, or thread library). The three main differences are:

  • that threads need to be first created and then started;
  • that a thread identifier can be recycled once a thread has terminated;
  • that threads can be organized in groups which form a tree-like structure.

Thread-local storage is available through the ThreadLocal module, and locks/conditions in the POSIX tradition are available through the Lock/ReadWriteLock/Condition modules. Advanced synchronization is provided through the following modules:

  • Semaphore;
  • Exchanger for simple rendez-vous allowing to swap data between two threads;
  • CountDownLatch for one-use barriers;
  • CyclicBarrier for reusable barriers;
  • Phaser for customizable barriers.

Here is an example of threads with a locally-stored accumulator variable:

let acc = ThreadLocal.make 0 (* initial value *)

let compute x =
  ...
  let old = ThreadLocal.get acc in (* read access *)
  ...
  ThreadLocal.set acc (f old) (* write access *)
  ...

let () =
 let threads =
    List.map
      (fun s -> Thread.make compute s)
      [ 1; 2; 3; 5; 7; 11; 13 ] in
  List.iter Thread.start threads;
  List.iter Thread.join threads

Atomics

The atomic containers have module names that slightly differ from the equivalent Java class names, the complete mapping being given by the following table. All the atomic containers provide compare-and-set operations, thus allowing to write lock-free algorithms.

OCaml module name Java class name
AtomicBool AtomicBoolean
AtomicInt32 AtomicInteger
AtomicInt32Array AtomicIntegerArray
AtomicInt64 AtomicLong
AtomicInt64Array AtomicLongArray
AtomicMarkableReference AtomicMarkableReference (1)
AtomicReference AtomicReference (1)
AtomicReferenceArray AtomicReferenceArray (1)
AtomicStampedReference AtomicStampedReference (1)

(1): physical comparison is used by the container. As OCaml-Java uses boxed values for OCaml int values, the container should not be used to store int values. Any other type can be safely stored (caching of int values ensure that sum types are correctly handled).

Here is an example of atomic use, comparing the use of a bare Pervasives.ref value with an atomic one:

let () = Random.self_init ()

let a = AtomicInt64.make 0L (* accesses to the value are atomic *)
let b = ref 0L (* accesses to the value are not atomic *)

let print s n =
  let l = ref 0L in
  for _i = 1 to n do
    let t = Int64.of_int (Random.int 250) in
    Printf.printf "%s (waiting %Ld)\n%!" s t;
    b := Int64.add !b t;
    l := Int64.add !l t;
    ignore (AtomicInt64.add_and_get a t);
    Thread.sleep t
  done;
  Printf.printf "l = %Ld\n" !l

let () =
  let n = 10 in
  let threads =
    List.map
      (fun s -> Thread.make (print s) n)
      [ "hello"; "salut" ] in
  List.iter Thread.start threads;
  List.iter Thread.join threads;
  Printf.printf "a = %Ld, b = %Ld\n" (AtomicInt64.get a) !b

Futures

Futures are similar to lazy values in the sense that, once created, it is possible to wait for their evaluation (though Future.get). However, they differ from lazy values as their evaluation is done in the background by another thread. Basic future manipulation is done through the Future module, but they are created by submitting a computation to a thread pool (by submit, or some invoke variant), as provided by the ThreadPoolExecutor module. Such pools need to be shutdown at program termination: as they contain threads, they prevent the JVM from ending the program (unless exit is explicitly called). Here is a basic example of future use:

let pool =
  ThreadPoolExecutor.make
    ~core_pool_size:4l
    ~max_pool_size:4l
    1L TimeUnit.Seconds (* keep-alive time for threads outside core *)
    RejectedExecutionHandler.Discard_policy (* how to handle rejected execution *)

let compute x =
  ...

let () =
  let f = ThreadPoolExecutor.submit pool compute 137 in
  ...
  Printf.printf "result = %d\n" (Future.get f);
  ThreadPoolExecutor.shutdown pool

It is also possible to launch several computations in parallel and wait for the first one to return:

let () =
  let l = [ 1; 2; 3; 5; 7; 11; 13 ] in
  let res = ThreadPoolExecutor.invoke_any pool (List.map compute l) in
  let others = ThreadPoolExecutor.shutdown_now pool in (* get all futures still running *)
  List.iter (fun f -> ignore (Future.cancel f true)) others;
  Printf.printf "result = %d\n" res

However, for Future.cancel to succeed, it is necessary for compute to cooperate: the compute function should periodically test whether is has been interrupted. A simple way to perform that test is to check the value returned by Thread.interrupted before each computation step:

let compute x =
  ...
  while (not !done) && (not (Thread.interrupted ())) do
    ...
    perform computation step
    ...
  done;
  ...

Besides simple futures, it is also possible to use scheduled futures (through the ScheduledFuture, and ScheduledThreadPoolExecutor modules). It is possible to schedule a one-shot future through schedule by just specifying a delay before future evaluation. It is also possible to schedule a future that will be repeatedly evaluated according to a given periodicity:

  • schedule_at_fixed_rate allows to specify the period between two evaluation starts;
  • schedule_with_fixed_delay allows to specify the period between one evaluation end and the next evaluation start.

The following code will print "hello!" once per second:

let pool =
  ScheduledThreadPoolExecutor.make
    ~core_pool_size:4l
    RejectedExecutionHandler.Discard_policy

let () =
  let f =
    ScheduledThreadPoolExecutor.schedule_at_fixed_rate
      pool
      print_endline "hello!"
      1L 1L TimeUnit.Seconds (* one second before first call, one second between two calls *) in
  ScheduledFuture.get f;
  ScheduledThreadPoolExecutor.shutdown pool

Finally, the ExecutorCompletionService module allows to either poll or wait until any of the submitted futures has completed evaluation.

Fork/join computations

Fork/join computations as supported by the concurrent library are quite different from their Java counterpart in order to provide a less general but safer abstraction. Basically, one can turn a sequential function into a parallel one by applying a very simple divide-and-conquer strategy that is defined by:

  • a function indicating when the passed problem (i.e. parameter value) should be split into sub-problems;
  • a function indicating how to combine the partial results of the sub-problems.

As an exemple, a (very inefficient) way of computing the fibonacci function is:

let rec fib n =
  if n <= 1 then
    1
  else
    (fib (n - 2)) + (fib (n - 1))

that can be turned into a parallel version through:

let fork n =
  if n < threshold
    then None (* below a given value, we do not fork *)
    else Some (n - 1, n - 2) (* otherwise, we fork and create two subproblems *)

let join x y = x + y

let parallel_fib pool = Concurrent.ForkJoin.split pool fork join fib

where pool is a value with type ForkJoinPool.t.

The behaviour of parallel_fib when passed a value x is the following:

  1. evaluate fork x;
  2. if fork x matches None, then the result for parallel_fib x is fib x;
  3. if fork x matches Some (x1, x2), then the result for parallel_fib x is join y1 y2 where yi is the result of parallel_fib xi.

Besides the ForkJoin.split function that is based on an option type, the ForkJoin module provides similar functions based on list, and array types. They allow to divide a problem into more than two subproblems at once, possibly saving recursion steps when possible.

Map/reduce computations

Map/reduce computations are a way to express computations through a bunch of functions:

  • map : input -> (key * value) list
  • combine : key -> value -> value -> value
  • reduce : key -> value -> output -> output

The computation is started by providing an input Stream.t that is used to launch map computations for the stream values in different threads. Results computed by threads are then stored into a map from key to value, using combine to merge the values for equivalent keys. Once all map computations have returned, reduce acts as a bare fold over the aforementioned map, calculating the final result.
Here is how to define a map/reduce computation:

module C = struct
  type input = int
  type key = int
  type value = int
  type output = (int * int) list
  let compare_keys = Pervasives.compare
    
  let map x = [x, compute x]
  let combine _ x y = x + y
  let reduce _ v acc = v + acc
end

that is then passed to the MapReduce.Make functor:

let pool =
  ThreadPoolExecutor.make
    ~core_pool_size:4l
    ~max_pool_size:4l
    1L TimeUnit.Days
    RejectedExecutionHandler.Discard_policy

module MR = MapReduce.Make (C)

let () =
  let s = Stream.of_list [ 1; 2; 3; 5; 7; 9; 11; 13; ... ] in
  let res = MR.compute pool s 0 in
  Printf.printf "result = %d\n" res;
  ThreadPoolExecutor.shutdown pool

Parallel arrays

The ParallelArray module provides the same functions as the Array module from the standard library, except that the following ones take advantage of the multiple cores to execute operations in parallel:

  • init;
  • iter and iteri;
  • fold_left and fold_right;
  • sort, stable_sort, and fast_sort.

Of course, when using one of these functions, there is no guarantee on the order in which operations will be executed.

The ParallelArray module also provides some functions with no counterpart in the Array module:

  • mem and memq;
  • exists and for_all;
  • find, find_index, and fast_find_all.

All functions accept two optional parameters:

  • ?pool:ThreadPoolExecutor that allows to specify which thread pool to use;
  • ?chunk_size:int that allows to specify the size of the data chunks passed to the various threads.

In order to get rid of these optional parameters, it is possible to use the ParallelArray.Make functor to set the parameters once for all function calls.

Warning! most functions have the same signature as in the Array module, but it is not possible for the fold functions. Indeed, as folds are executed on parts of the array, an additional function is needed in order to combine the results from those partial folds. This leads to the following type for the fold_left function:

?pool:ThreadPoolExecutor.t ->
?chunk_size:int ->
('a -> 'b -> 'a) -> (* original fold function *)
('a -> 'a -> 'a) -> (* function used to combine results from partial folds *)
'a ->
'b array ->
'a

Of course, the parallel folds will yield the same results as their classical counterparts iff passed functions are associative and commutative.

Here is an example comparing the use of Array and ParallelArray:

let size = 10000

let a = Array.make size init_func

let sequential () =
  let b = Array.map compute_func a in
  let res = Array.fold_left (fun acc elem -> acc + (aux_func elem)) 0 b in
  Printf.printf "result = %d\n" res

let parallel () =
  let b = ParallelArray.map compute_func a in
  let res = ParallelArray.fold_left (fun acc elem -> acc + (aux_func elem)) (+) 0 b in
  Printf.printf "result = %d\n" res;
  ParallelArray.shutdown_now () (* stop threads from the default ParallelArray pool *)

STM

Warning! the current implementation has only been lightly tested.

The STM module provides support for a partial software transactional memory. This means that the whole memory is not protected by transactions: only values of type STM.ref are protected. Such values are akin to Pervasives.ref values, and are created through the STM.ref function. However, those values can only be accessed from within a transaction.

Two functions allow to process transactions:

  • STM.run that executes any transaction;
  • STM.run_read_only that executes a transaction that cannot modify values.

A transaction function is passed to one of these functions in order to specify the transaction behavior. The transaction function can access STM.ref values through accessor functions. As an example, the canonical banking account example can be written:

type account = {
    name : string; (* bare value *)
    balance : int STM.ref; (* value protected by transactions *)
  }

let make_account n b =
  { name = n; balance = STM.ref b }

let print_account acc =
  STM.run_read_only (fun get ->
    Printf.printf "%s = %dn" acc.name (get acc.balance))

let transfer x y a =
  STM.run (fun get set ->
    let xv, yv = get x.balance, get y.balance in
    set x.balance (xv - a);
    set y.balance (yv + a));