1. Programming Model

This section shows how the COMPSs programming model is used to develop a Java task-based parallel application for distributed computing. First, We introduce the structure of a COMPSs Java application and with a simple example. Then, we will provide a complete guide about how to define the application tasks. Finally, we will show special API calls and other optimization hints.

1.1. Application Overview

A COMPSs application is composed of three parts:

  • Main application code: the code that is executed sequentially and contains the calls to the user-selected methods that will be executed by the COMPSs runtime as asynchronous parallel tasks.
  • Remote methods code: the implementation of the tasks.
  • Task definition interface: It is a Java annotated interface which declares the methods to be run as remote tasks along with metadata information needed by the runtime to properly schedule the tasks.

The main application file name has to be the same of the main class and starts with capital letter, in this case it is Simple.java. The Java annotated interface filename is application name + Itf.java, in this case it is SimpleItf.java. And the code that implements the remote tasks is defined in the application name + Impl.java file, in this case it is SimpleImpl.java.

All code examples are in the /home/compss/tutorial_apps/java/ folder of the development environment.

Main application code

In COMPSs, the user’s application code is kept unchanged, no API calls need to be included in the main application code in order to run the selected tasks on the nodes.

The COMPSs runtime is in charge of replacing the invocations to the user-selected methods with the creation of remote tasks also taking care of the access to files where required. Let’s consider the Simple application example that takes an integer as input parameter and increases it by one unit.

The main application code of Simple application is shown in the following code block. It is executed sequentially until the call to the increment() method. COMPSs, as mentioned above, replaces the call to this method with the generation of a remote task that will be executed on an available node.

Code 7 Simple in Java (Simple.java)
package simple;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import simple.SimpleImpl;

public class Simple {

  public static void main(String[] args) {
    String counterName = "counter";
    int initialValue = args[0];

    //--------------------------------------------------------------//
    // Creation of the file which will contain the counter variable //
    //--------------------------------------------------------------//
    try {
       FileOutputStream fos = new FileOutputStream(counterName);
       fos.write(initialValue);
       System.out.println("Initial counter value is " + initialValue);
       fos.close();
    }catch(IOException ioe) {
       ioe.printStackTrace();
    }

    //----------------------------------------------//
    //           Execution of the program           //
    //----------------------------------------------//
    SimpleImpl.increment(counterName);

    //----------------------------------------------//
    //    Reading from an object stored in a File   //
    //----------------------------------------------//
    try {
       FileInputStream fis = new FileInputStream(counterName);
       System.out.println("Final counter value is " + fis.read());
       fis.close();
    }catch(IOException ioe) {
       ioe.printStackTrace();
    }
  }
}

Remote methods code

The following code contains the implementation of the remote method of the Simple application that will be executed remotely by COMPSs.

Code 8 Simple Implementation (SimpleImpl.java)
package simple;

import  java.io.FileInputStream;
import  java.io.FileOutputStream;
import  java.io.IOException;
import  java.io.FileNotFoundException;

public class SimpleImpl {
  public static void increment(String counterFile) {
    try{
      FileInputStream fis = new FileInputStream(counterFile);
      int count = fis.read();
      fis.close();
      FileOutputStream fos = new FileOutputStream(counterFile);
      fos.write(++count);
      fos.close();
    }catch(FileNotFoundException fnfe){
      fnfe.printStackTrace();
    }catch(IOException ioe){
      ioe.printStackTrace();
    }
  }
}

Task definition interface

This Java interface is used to declare the methods to be executed remotely along with Java annotations that specify the necessary metadata about the tasks. The metadata can be of three different types:

  1. For each parameter of a method, the data type (currently File type, primitive types and the String type are supported) and its directions (IN, OUT, INOUT, COMMUTATIVE or CONCURRENT).
  2. The Java class that contains the code of the method.
  3. The constraints that a given resource must fulfill to execute the method, such as the number of processors or main memory size.

The task description interface of the Simple app example is shown in the following figure. It includes the description of the Increment() method metadata. The method interface contains a single input parameter, a string containing a path to the file counterFile. In this example there are constraints on the minimum number of processors and minimum memory size needed to run the method.

Code 9 Interface of the Simple application (SimpleItf.java)
package simple;

import  es.bsc.compss.types.annotations.Constraints;
import  es.bsc.compss.types.annotations.task.Method;
import  es.bsc.compss.types.annotations.Parameter;
import  es.bsc.compss.types.annotations.parameter.Direction;
import  es.bsc.compss.types.annotations.parameter.Type;

public interface SimpleItf {

  @Constraints(computingUnits = "1", memorySize = "0.3")
  @Method(declaringClass = "simple.SimpleImpl")
  void increment(
      @Parameter(type = Type.FILE, direction = Direction.INOUT)
      String file
  );

}

The following sections show a detailed guide of how to implement complex applications.

1.2. Task definition reference guide

The task definition interface is a Java annotated interface where developers define tasks as annotated methods in the interfaces. Annotations can be of three different types:

  1. Task-definition annotations are method annotations to indicate which type of task is a method declared in the interface.
  2. The Parameter annotation provides metadata about the task parameters, such as data type, direction and other property for runtime optimization.
  3. The Constraints annotation describes the minimum capabilities that a given resource must fulfill to execute the task, such as the number of processors or main memory size.
  4. Scheduler hint annotation provides information about how to deal with tasks of this type at scheduling and execution

A complete and detailed explanation of the usage of the metadata includes:

1.2.1. Task-definition Annotations

For each declared methods, developers has to define a task type. The following list enumerates the possible task types:

  • @Method: Defines the Java method as a task

    • declaringClass (Mandatory) String specifying the class that implements the Java method.
    • targetDirection This field specifies the direction of the target object of an object method. It can be defined as: INOUT” (default value) if the method modifies the target object, “CONCURRENT” if this object modification can be done concurrently, or “IN” if the method does not modify the target object. ().
    • priority “true” if the task takes priority and “false” otherwise. This parameter is used by the COMPSs scheduler (it is a String not a Java boolean).
    • onFailure Expected behaviour if the task fails. OnFailure.RETRY (default value) makes the task be executed again, OnFailure.CANCEL_SUCCESSORS ignores the failure and cancels the succesor tasks, OnFailure.FAIL stops the whole application in a save mode once a task fails or OnFailure.IGNORE ignores the failure and continues with normal runtime execution.
  • @Binary: Defines the Java method as a binary invokation

    • binary (Mandatory) String defining the full path of the binary that must be executed.
    • workingDir Full path of the binary working directory inside the COMPSs Worker.
    • priority “true” if the task takes priority and “false” otherwise. This parameter is used by the COMPSs scheduler (it is a String not a Java boolean).
  • @MPI: Defines the Java method as a MPI invokation

    • mpiRunner (Mandatory) String defining the mpi runner command.
    • binary (Mandatory) String defining the full path of the binary that must be executed.
    • processes String defining the number of MPI processes spawn in the task execution. This can be combined with the constraints annotation to create define a MPI+OpenMP task. (Default is 1)
    • scaleByCU It indicates that the defined processes will be scaled by the defined computingUnits in the constraints. So, the total MPI processes will be processes multiplied by computingUnits. This functionality is used to groups MPI processes per node. Number of groups will be set in processes and the number of processes per node will be indicated by computingUnits
    • workingDir Full path of the binary working directory inside the COMPSs Worker.
    • priority “true” if the task takes priority and “false” otherwise. This parameter is used by the COMPSs scheduler (it is a String not a Java boolean).
  • @OmpSs: Defines the Java method as a OmpSs invokation

    • binary (Mandatory) String defining the full path of the binary that must be executed.
    • workingDir Full path of the binary working directory inside the COMPSs Worker.
    • priority “true” if the task takes priority and “false” otherwise. This parameter is used by the COMPSs scheduler (it is a String not a Java boolean).
  • @Service: Mandatory. It specifies the service properties.

    • namespace Mandatory. Service namespace
    • name Mandatory. Service name.
    • port Mandatory. Service port.
    • operation Operation type.
    • priority “true” if the service takes priority, “false” otherwise. This parameter is used by the COMPSs scheduler (it is a String not a Java boolean).

1.2.2. Parameter-level annotations

For each parameter of task (method declared in the interface), the user must include a @Parameter annotation. The properties

  • Direction: Describes how a task uses the parameter (Default is IN).

    • Direction.IN: Task only reads the data.
    • Direction.INOUT: Task reads and modifies
    • Direction.OUT: Task completely modify the data, or previous content or not modified data is not important.
    • Direction.COMMUTATIVE: An INOUT usage of the data which can be re-ordered with other executions of the defined task.
    • Direction.CONCURRENT: The task allow concurrent modifications of this data. It requires a storage backend that manages concurrent modifications.
  • Type: Describes the data type of the task parameter. By default, the runtime infers the type according to the Java datatype. However, it is mandatory to define it for files, directories and Streams.

    COMPSs supports the following types for task parameters:

    • Basic types: To indicate a parameter is a Java primitive type use the follwing types: Type.BOOLEAN, Type.CHAR, Type.BYTE, Type.SHORT, Type.INT, Type.LONG, Type.FLOAT, Type.DOUBLE. They can only have IN direction, since primitive types in Java are always passed by value.
    • String: To indicate a parameter is a Java String use Type.STRING. It can only have IN direction, since Java Strings are immutable.
    • File: The real Java type associated with a file parameter is a String that contains the path to the file. However, if the user specifies a parameter as Type.FILE, COMPSs will treat it as such. It can have any direction (IN, OUT, INOUT, CONMMUTATIVE or CONCURRENT).
    • Directory: The real Java type associated with a directory parameter is a String that contains the path to the directory. However, if the user specifies a parameter as Type.DIRECTORY, COMPSs will treat it as such. It can have any direction (IN, OUT, INOUT, CONMMUTATIVE or CONCURRENT).
    • Object: An object parameter is defined with Type.Object. It can have any direction (IN, INOUT, COMMUTATIVE or CONCURRENT).
    • Streams: A Task parameters can be defined as stream with Type.STREAM. It can have direction IN, if the task pull data from the stream, or OUT if the task pushes data to the stream.
  • Return type: Any object or a generic class object. In this case the direction is always OUT. Basic types are also supported as return types. However, we do not recommend to use them because they cause an implicit synchronization

  • StdIOStream: For non-native tasks (binaries, MPI, and OmpSs) COMPSs supports the automatic redirection of the Linux streams by specifying StdIOStream.STDIN, StdIOStream.STDOUT or StdIOStream.STDERR. Notice that any parameter annotated with the stream annotation must be of type Type.FILE, and with direction Direction.IN for StdIOStream.STDIN or Direction.OUT/ Direction.INOUT for StdIOStream.STDOUT and StdIOStream.STDERR.

  • Prefix: For non-native tasks (binaries, MPI, and OmpSs) COMPSs allows to prepend a constant String to the parameter value to use the Linux joint-prefixes as parameters of the binary execution.

  • Weight: Provides a hint of the size of this parameter compared to a default one. For instance, if a parameters is 3 times larger than the others, set the weigh property of this paramenter to 3.0. (Default is 1.0).

  • keepRename: Runtime rename files to avoid some data dependencies. It is transparent to the final user because we rename back the filename when invoking the task at worker. This management creates an overhead, if developers know that the task is not name nor extension sensitive (i.e can work with rename), they can set this property to true to reduce the overhead.

1.2.3. Constraints annotations

  • @Constraints: The user can specify the capabilities that a resource must have in order to run a method. For example, in a cloud execution the COMPSs runtime creates a VM that fulfils the specified requirements in order to perform the execution. A full description of the supported constraints can be found in Table 14.

1.2.4. Scheduler annotations

  • @SchedulerHints: It specifies hints for the scheduler about how to treat the task.

    • isReplicated “true” if the method must be executed in all the worker nodes when invoked from the main application (it is a String not a Java boolean).
    • isDistributed “true” if the method must be scheduled in a forced round robin among the available resources (it is a String not a Java boolean).

1.3. Alternative method implementations

Since version 1.2, the COMPSs programming model allows developers to define sets of alternative implementations of the same method in the Java annotated interface. Code 10 depicts an example where the developer sorts an integer array using two different methods: merge sort and quick sort that are respectively hosted in the packagepath.Mergesort and packagepath.Quicksort classes.

Code 10 Alternative sorting method definition example
@Method(declaringClass = "packagepath.Mergesort")
@Method(declaringClass = "packagepath.Quicksort")
void sort(
    @Parameter(type = Type.OBJECT, direction = Direction.INOUT)
    int[] array
);

As depicted in the example, the name and parameters of all the implementations must coincide; the only difference is the class where the method is implemented. This is reflected in the attribute declaringClass of the @Method annotation. Instead of stating that the method is implemented in a single class, the programmer can define several instances of the @Method annotation with different declaring classes.

As independent remote methods, the sets of equivalent methods might have common restrictions to be fulfilled by the resource hosting the execution. Or even, each implementation can have specific constraints. Through the @Constraints annotation, developers can specify the common constraints for a whole set of methods. In the following example (Code 11) only one core is required to run the method of both sorting algorithms.

Code 11 Alternative sorting method definition with constraint example
@Constraints(computingUnits = "1")
@Method(declaringClass = "packagepath.Mergesort")
@Method(declaringClass = "packagepath.Quicksort")
void sort(
    @Parameter(type = Type.OBJECT, direction = Direction.INOUT)
    int[] array
);

However, these sorting algorithms have different memory consumption, thus each algorithm might require a specific amount of memory and that should be stated in the implementation constraints. For this purpose, the developer can add a @Constraints annotation inside each @Method annotation containing the specific constraints for that implementation. Since the Mergesort has a higher memory consumption than the quicksort, the Code 12 sets a requirement of 1 core and 2GB of memory for the mergesort implementation and 1 core and 500MB of memory for the quicksort.

Code 12 Alternative sorting method definition with specific constraints example
@Constraints(computingUnits = "1")
@Method(declaringClass = "packagepath.Mergesort", constraints = @Constraints(memorySize = "2.0"))
@Method(declaringClass = "packagepath.Quicksort", constraints = @Constraints(memorySize = "0.5"))
void sort(
    @Parameter(type = Type.OBJECT, direction = Direction.INOUT)
    int[] array
);

1.4. Java API calls

COMPSs also provides a explicit synchronization call, namely barrier, which can be used through the COMPSs Java API. The use of barrier forces to wait for all tasks that have been submitted before the barrier is called. When all tasks submitted before the barrier have finished, the execution continues (Code 13).

Code 13 COMPSs.barrier() example
import es.bsc.compss.api.COMPSs;

public class Main {
    public static void main(String[] args) {
        // Setup counterName1 and counterName2 files
        // Execute task increment 1
        SimpleImpl.increment(counterName1);
        // API Call to wait for all tasks
        COMPSs.barrier();
        // Execute task increment 2
        SimpleImpl.increment(counterName2);
    }
}

When an object if used in a task, COMPSs runtime store the references of these object in the runtime data structures and generate replicas and versions in remote workers. COMPSs is automatically removing these replicas for obsolete versions. However, the reference of the last version of these objects could be stored in the runtime data-structures preventing the garbage collector to remove it when there are no references in the main code. To avoid this situation, developers can indicate the runtime that an object is not going to use any more by calling the deregisterObject API call. Code 14 shows a usage example of this API call.

Code 14 COMPSs.deregisterObject() example
import es.bsc.compss.api.COMPSs;

public class Main {
    public static void main(String[] args) {
        final int ITERATIONS = 10;
        for (int i = 0; i < ITERATIONS; ++i) {
            Dummy d = new Dummy(d);
            TaskImpl.task(d);
            /*Allows garbage collector to delete the
              object from memory when the task is finished */
            COMPSs.deregisterObject((Object) d);
        }
    }
}

To synchronize files, the getFile API call synchronizes a file, returning the last version of file with its original name. Code 15 contains an example of its usage.

Code 15 COMPSs.getFile() example
import es.bsc.compss.api.COMPSs;

public class Main {
    public static void main(String[] args) {
        for (int i=0; i<1; i++) {
            TaskImpl.task(FILE_NAME, i);
        }
        /*Waits until all tasks have finished and
          synchronizes the file with its last version*/
        COMPSs.getFile(FILE_NAME);
    }
}