Task Parameters
The metadata corresponding to a parameter is specified as an argument of
the @task
decorator, whose name is the formal parameter’s name and whose
value defines the type and direction of the parameter. The parameter types and
directions can be:
- Types
Primitive types (integer, long, float, boolean, strings)
Objects (instances of user-defined classes, dictionaries, lists, tuples, complex numbers)
Files
Collections (instances of lists)
Dictionaries (instances of dictionary)
Streams
IO streams (for binaries)
- Direction
Read-only (IN - default or IN_DELETE)
Read-write (INOUT)
Write-only (OUT)
Concurrent (CONCURRENT)
Commutative (COMMUTATIVE)
COMPSs is able to automatically infer the parameter type for primitive types, strings and objects, while the user needs to specify it for files. On the other hand, the direction is only mandatory for INOUT and OUT parameters.
Note
please note that in the following cases there is no need to include an argument in the @task decorator for a given task parameter:
Parameters of primitive types (integer, long, float, boolean) and strings: the type of these parameters can be automatically inferred by COMPSs, and their direction is always IN.
Read-only object parameters: the type of the parameter is automatically inferred, and the direction defaults to IN.
The parameter metadata is available from the pycompss library (Code 27)
from pycompss.api.parameter import *
Objects
The default type for a parameter is object. Consequently, there is no need to use a specific keyword. However, it is necessary to indicate its direction (unless for input parameters):
PARAMETER |
DESCRIPTION |
---|---|
IN |
The parameter is read-only. The type will be inferred. |
IN_DELETE |
The parameter is read-only. The type will be inferred. Will be automatically removed after its usage. |
INOUT |
The parameter is read-write. The type will be inferred. |
OUT |
The parameter is write-only. The type will be inferred. |
CONCURRENT |
The parameter is read-write with concurrent access. The type will be inferred. |
COMMUTATIVE |
The parameter is read-write with commutative access. The type will be inferred. |
Continuing with the example, in Code 28 the decorator specifies that func has a parameter called obj, of type object and INOUT direction. Note how the second parameter, i, does not need to be specified, since its type (integer) and direction (IN) are automatically inferred by COMPSs.
from pycompss.api.task import task
from pycompss.api.parameter import INOUT, IN
@task(obj=INOUT, i=IN)
def func(obj, i):
...
The previous task definition can be simplified due to the default IN direction for objects (Code 29):
from pycompss.api.task import task
from pycompss.api.parameter import INOUT
@task(obj=INOUT)
def func(obj, i):
...
Tip
In order to choose the apropriate direction, a good exercise is to think if the function only consumes the object (IN), modifies the object (INOUT), or produces an object (OUT).
Tip
The IN_DELETE definition is intended to one use objects. Consequently, the information related to the object will be released as soon as possible.
The user can also define that the access to a object is concurrent with CONCURRENT (Code 30). Tasks that share a CONCURRENT parameter will be executed in parallel, if any other dependency prevents this. The CONCURRENT direction allows users to have access from multiple tasks to the same object/file during their executions.
from pycompss.api.task import task
from pycompss.api.parameter import CONCURRENT
@task(obj=CONCURRENT)
def func(obj, i):
...
Important
COMPSs does not manage the interaction with the objects used/modified concurrently. Taking care of the access/modification of the concurrent objects is responsibility of the developer.
Or even, the user can also define that the access to a parameter is commutative with COMMUTATIVE (Code 31). The execution order of tasks that share a COMMUTATIVE parameter can be changed by the runtime following the commutative property.
from pycompss.api.task import task
from pycompss.api.parameter import COMMUTATIVE
@task(obj=COMMUTATIVE)
def func(obj, i):
...
Files
It is possible to define that a parameter is a file (FILE), and its direction:
PARAMETER |
DESCRIPTION |
---|---|
FILE/FILE_IN |
The parameter is a file. The direction is assumed to be IN. |
FILE_INOUT |
The parameter is a read-write file. |
FILE_OUT |
The parameter is a write-only file. |
FILE_CONCURRENT |
The parameter is a concurrent read-write file. |
FILE_COMMUTATIVE |
The parameter is a commutative read-write file. |
Continuing with the example, in Code 32 the decorator
specifies that func
has a parameter called f
, of type FILE
and
INOUT
direction (FILE_INOUT
).
from pycompss.api.task import task
from pycompss.api.parameter import FILE_INOUT
@task(f=FILE_INOUT)
def func(f):
fd = open(f, 'a+')
...
# append something to fd
...
fd.close()
def main():
f = "/path/to/file.extension"
# Populate f
func(f)
Tip
The value for a FILE (e.g. f
) is a string pointing to the file
to be used at func
task. However, it can also be None
if it is
optional. Consequently, the user can define task that can receive a FILE
or not, and act accordingly. For example (Code 33):
from pycompss.api.task import task
from pycompss.api.parameter import FILE_IN
@task(f=FILE_IN)
def func(f):
if f:
# Do something with the file
with open(f, 'r') as fd:
num_lines = len(rd.readlines())
return num_lines
else:
# Do something when there is no input file
return -1
def main():
f = "/path/to/file.extension"
# Populate f
num_lines_f = func(f) # num_lines_f == actual number of lines of file.extension
g = None
num_lines_g = func(g) # num_lines_g == -1
The user can also define that the access to file parameter is concurrent with FILE_CONCURRENT (Code 34). Tasks that share a FILE_CONCURRENT parameter will be executed in parallel, if any other dependency prevents this. The CONCURRENT direction allows users to have access from multiple tasks to the same file during their executions.
from pycompss.api.task import task
from pycompss.api.parameter import FILE_CONCURRENT
@task(f=FILE_CONCURRENT)
def func(f, i):
...
Important
COMPSs does not manage the interaction with the files used/modified concurrently. Taking care of the access/modification of the concurrent files is responsibility of the developer.
Or even, the user can also define that the access to a parameter is a file FILE_COMMUTATIVE (Code 35). The execution order of tasks that share a FILE_COMMUTATIVE parameter can be changed by the runtime following the commutative property.
from pycompss.api.task import task
from pycompss.api.parameter import FILE_COMMUTATIVE
@task(f=FILE_COMMUTATIVE)
def func(f, i):
...
Directories
In addition to files, it is possible to define that a parameter is a directory (DIRECTORY), and its direction:
PARAMETER |
DESCRIPTION |
---|---|
DIRECTORY_IN |
The parameter is a directory and the direction is IN. The directory will be compressed before any transfer amongst nodes. |
DIRECTORY_INOUT |
The parameter is a read-write directory. The directory will be compressed before any transfer amongst nodes. |
DIRECTORY_OUT |
The parameter is a write-only directory. The directory will be compressed before any transfer amongst nodes. |
The definition of a DIRECTORY parameter is shown in Code 36. The decorator specifies that func has a parameter called d, of type DIRECTORY and INOUT direction.
from pycompss.api.task import task
from pycompss.api.parameter import DIRECTORY_INOUT
@task(d=DIRECTORY_INOUT)
def func(d):
...
Collections
It is possible to specify that a parameter is a collection of elements (e.g. list) and its direction.
PARAMETER |
DESCRIPTION |
---|---|
COLLECTION_IN |
The parameter is read-only collection. |
COLLECTION_IN_DELETE |
The parameter is read-only collection for single usage (will be automatically removed after its usage). |
COLLECTION_INOUT |
The parameter is read-write collection. |
COLLECTION_OUT |
The parameter is write-only collection. |
In this case (Code 37), the list may contain sub-objects that will be handled automatically by the runtime. It is important to annotate data structures as collections if in other tasks there are accesses to individual elements of these collections as parameters. Without this annotation, the runtime will not be able to identify data dependences between the collections and the individual elements.
from pycompss.api.task import task
from pycompss.api.parameter import COLLECTION
@task(my_collection=COLLECTION)
def func(my_collection):
for element in my_collection:
...
The sub-objects of the collection can be collections of elements (and recursively). In this case, the runtime also keeps track of all elements contained in all sub-collections. In order to improve the performance, the depth of the sub-objects can be limited through the use of the depth parameter (Code 38)
from pycompss.api.task import task
from pycompss.api.parameter import COLLECTION_IN
@task(my_collection={Type:COLLECTION_IN, Depth:2})
def func(my_collection):
for inner_collection in my_collection:
for element in inner_collection:
# The contents of element will not be tracked
...
Tip
A collection can contain dictionaries, and will be analyzed automatically.
Tip
If the collection is intended to be used only once with IN
direction, the
COLLECTION_IN_DELETE
type is recommended, since it automatically removes
the entire collection after the task. This enables to release as soon as
possible memory and storage.
Collections of files
It is also possible to specify that a parameter is a collection of files (e.g. list) and its direction.
PARAMETER |
DESCRIPTION |
---|---|
COLLECTION_FILE/COLLECTION_FILE_IN |
The parameter is read-only collection of files. |
COLLECTION_FILE_INOUT |
The parameter is read-write collection of files. |
COLLECTION_FILE_OUT |
The parameter is write-only collection of files. |
In this case (Code 39), the list may contain files that will be handled automatically by the runtime. It is important to annotate data structures as collections if in other tasks there are accesses to individual elements of these collections as parameters. Without this annotation, the runtime will not be able to identify data dependences between the collections and the individual elements.
from pycompss.api.task import task
from pycompss.api.parameter import COLLECTION_FILE
@task(my_collection=COLLECTION_FILE)
def func(my_collection):
for file in my_collection:
...
The file of the collection can be collections of elements (and recursively). In this case, the runtime also keeps track of all files contained in all sub-collections. In order to improve the performance, the depth of the sub-files can be limited through the use of the depth parameter as with objects (Code 38)
Dictionaries
It is possible to specify that a parameter is a dictionary of elements (e.g. dict) and its direction.
PARAMETER |
DESCRIPTION |
---|---|
DICTIONARY_IN |
The parameter is read-only dictionary. |
DICTIONARY_IN_DELETE |
The parameter is read-only dictionary for single usage (will be automatically removed after its usage). |
DICTIONARY_INOUT |
The parameter is read-write dictionary. |
As with the collections, it is possible to specify that a parameter is a dictionary of elements (e.g. dict) and its direction (DICTIONARY_IN or DICTIONARY_INOUT) (Code 40), whose sub-objects will be handled automatically by the runtime.
from pycompss.api.task import task
from pycompss.api.parameter import DICTIONARY
@task(my_dictionary=DICTIONARY)
def func(my_dictionary):
for k, v in my_dictionary.items():
...
The sub-objects of the dictionary can be collections or dictionary of elements (and recursively). In this case, the runtime also keeps track of all elements contained in all sub-collections/sub-dictionaries. In order to improve the performance, the depth of the sub-objects can be limited through the use of the depth parameter (Code 41)
from pycompss.api.task import task
from pycompss.api.parameter import DICTIONARY_IN
@task(my_dictionary={Type:DICTIONARY_IN, Depth:2})
def func(my_dictionary):
for key, inner_dictionary in my_dictionary.items():
for sub_key, sub_value in inner_dictionary.items():
# The contents of element will not be tracked
...
Tip
A dictionary can contain collections, and will be analyzed automatically.
Tip
If the dictionary is intended to be used only once with IN
direction, the
DICTIONARY_IN_DELETE
type is recommended, since it automatically removes
the entire dictionary after the task. This enables to release as soon as
possible memory and storage.
Streams
It is possible to use streams as input or output of the tasks by defining that a parameter is STREAM and its direction.
PARAMETER |
DESCRIPTION |
---|---|
STREAM_IN |
The parameter is a read-only stream. |
STREAM_OUT |
The parameter is a write-only stream. |
For example, Code 42 shows an example using STREAM_IN or STREAM_OUT parameters This parameters enable to mix a task-driven workflow with a data-driven workflow.
from pycompss.api.task import task
from pycompss.api.parameter import STREAM_IN
from pycompss.api.parameter import STREAM_OUT
@task(ods=STREAM_OUT)
def write_objects(ods):
...
for i in range(NUM_OBJECTS):
# Build object
obj = MyObject()
# Publish object
ods.publish(obj)
...
...
# Mark the stream for closure
ods.close()
@task(ods=STREAM_IN, returns=int)
def read_objects(ods):
...
num_total = 0
while not ods.is_closed():
# Poll new objects
new_objects = ods.poll()
# Process files
...
# Accumulate read files
num_total += len(new_objects)
...
# Return the number of processed files
return num_total
The stream parameter also supports Files (Code 43).
from pycompss.api.task import task
from pycompss.api.parameter import STREAM_IN
from pycompss.api.parameter import STREAM_OUT
@task(fds=STREAM_OUT)
def write_files(fds):
...
for i in range(NUM_FILES):
file_name = str(uuid.uuid4())
# Write file
with open(file_path, 'w') as f:
f.write("Test " + str(i))
...
...
# Mark the stream for closure
fds.close()
@task(fds=STREAM_IN, returns=int)
def read_files(fds):
...
num_total = 0
while not fds.is_closed():
# Poll new files
new_files = fds.poll()
# Process files
for nf in new_files:
with open(nf, 'r') as f:
...
# Accumulate read files
num_total += len(new_files)
...
...
# Return the number of processed files
return num_total
In addition, the stream parameter can also be defined for binary tasks (Code 44).
from pycompss.api.task import task
from pycompss.api.binary import binary
from pycompss.api.parameter import STREAM_OUT
@binary(binary="file_generator.sh")
@task(fds=STREAM_OUT)
def write_files(fds):
# Equivalent to: ./file_generator.sh > fds
pass
Standard Streams
Finally, a parameter can also be defined as the standard input, standard output, and standard error.
PARAMETER |
DESCRIPTION |
---|---|
STDIN |
The parameter is a IO stream for standard input redirection. |
STDOUT |
The parameter is a IO stream for standard output redirection. |
STDERR |
The parameter is a IO stream for standard error redirection. |
Important
STDIN, STDOUT and STDERR are only supported in binary tasks
This is particularly useful with binary tasks that consume/produce from standard IO streams, and the user wants to redirect the standard input/output/error to a particular file. Code 45 shows an example of a binary task that invokes output_generator.sh which produces the result in the standard output, and the task takes that output and stores it into fds.
from pycompss.api.task import task
from pycompss.api.binary import binary
from pycompss.api.parameter import STDOUT
@binary(binary="output_generator.sh")
@task(fds=STDOUT)
def write_files(fds):
# Equivalent to: ./file_generator.sh > fds
pass