1.6. PyCOMPSs: Using objects, lists, and synchronization. Using dictionary.
In this example we will see how classes and objects can be used from PyCOMPSs, and that class methods can become tasks. The example also illustrates the use of dictionary
Import the PyCOMPSs library
[1]:
import pycompss.interactive as ipycompss
Start the runtime
Initialize COMPSs runtime Parameters indicates if the execution will generate task graph, tracefile, monitor interval and debug information.
[2]:
import os
if 'BINDER_SERVICE_HOST' in os.environ:
ipycompss.start(graph=True, debug=True,
project_xml='../xml/project.xml',
resources_xml='../xml/resources.xml')
else:
ipycompss.start(graph=True, monitor=1000, debug=True, trace=False)
********************************************************
**************** PyCOMPSs Interactive ******************
********************************************************
* .-~~-.--. ______ ______ *
* : ) |____ \ |____ \ *
* .~ ~ -.\ /.- ~~ . __) | __) | *
* > `. .' < |__ | |__ | *
* ( .- -. ) ____) | _ ____) | *
* `- -.-~ `- -' ~-.- -' |______/ |_| |______/ *
* ( : ) _ _ .-: *
* ~--. : .--~ .-~ .-~ } *
* ~-.-^-.-~ \_ .~ .-~ .~ *
* \ \ ' \ '_ _ -~ *
* \`.\`. // *
* . - ~ ~-.__\`.\`-.// *
* .-~ . - ~ }~ ~ ~-.~-. *
* .' .-~ .-~ :/~-.~-./: *
* /_~_ _ . - ~ ~-.~-._ *
* ~-.< *
********************************************************
* - Starting COMPSs runtime... *
* - Log path : /home/user/.COMPSs/Interactive_06/
* - PyCOMPSs Runtime started... Have fun! *
********************************************************
Importing task and arguments directionality modules
Import task module before annotating functions or methods
[3]:
from pycompss.api.api import compss_barrier
from pycompss.api.api import compss_wait_on
from pycompss.api.task import task
from pycompss.api.parameter import *
Declaring a class
[4]:
%%writefile my_shaper.py
from pycompss.api.task import task
from pycompss.api.parameter import IN
class Shape(object):
def __init__(self,x,y):
self.x = x
self.y = y
description = "This shape has not been described yet"
@task(returns=int, target_direction=IN)
def area(self):
import time
time.sleep(4)
return self.x * self.y
@task()
def scaleSize(self,scale):
import time
time.sleep(4)
self.x = self.x * scale
self.y = self.y * scale
@task(returns=int, target_direction=IN)
def perimeter(self):
import time
time.sleep(4)
return 2 * self.x + 2 * self.y
def describe(self,text):
self.description = text
@task(target_direction=IN)
def infoShape(self):
import time
time.sleep(1)
print('Shape x=', self.x, 'y= ', self.y)
Writing my_shaper.py
[5]:
@task(returns=int, mydict = DICTIONARY_IN)
def addAll(mydict):
import time
time.sleep(4)
sum = 0
for key, value in mydict.items():
sum = sum + value
return sum
[6]:
@task(returns=2, mydict=DICTIONARY_IN, my_otherdict=DICTIONARY_IN)
def addAll_2(mydict, my_otherdict):
import time
time.sleep(4)
sum = 0
sum2 = 0
for key, value in mydict.items():
sum = sum + value
for key2, value2 in my_otherdict.items():
sum2 = sum2 + value2
return sum, sum2
[7]:
@task(mydict=DICTIONARY_INOUT)
def scale_all(mydict, scale):
import time
time.sleep(4)
for key, value in mydict.items():
mydict[key].x = value.x * scale
mydict[key].y = value.y * scale
Invoking tasks
[8]:
from my_shaper import Shape
[9]:
my_shapes = {}
my_shapes["rectangle"] = Shape(100,45)
my_shapes["square"] = Shape(50,50)
my_shapes["long_rectangle"] = Shape(10,100)
my_shapes["small_rectangle"] = Shape(20,30)
[10]:
all_areas = {}
[11]:
for key, value in my_shapes.items():
all_areas[key] = value.area()
Synchronizing results from tasks
[12]:
all_areas = compss_wait_on(all_areas)
print(all_areas)
{'rectangle': 4500, 'square': 2500, 'long_rectangle': 1000, 'small_rectangle': 600}
[13]:
rectangle = Shape(200,25)
rectangle.scaleSize(5)
area_rectangle = rectangle.area()
rectangle = compss_wait_on(rectangle)
print('X =', rectangle.x)
area_rectangle = compss_wait_on(area_rectangle)
print('Area =', area_rectangle)
X = 1000
Area = 125000
Accessing data in collections
[14]:
all_perimeters = {}
my_shapes["new_shape"] = rectangle
for key, value in my_shapes.items():
all_perimeters[key] = value.perimeter()
[15]:
mysum = addAll(all_perimeters)
mysum = compss_wait_on(mysum)
print(mysum)
Task definition detected.
Found task: addAll
3060
Accessing two collections
[16]:
all_perimeters = {}
all_areas = {}
for key, value in my_shapes.items():
all_perimeters[key] = value.perimeter()
all_areas[key] = value.area()
[17]:
[my_per, my_area] = addAll_2(all_perimeters, all_areas)
[my_per, my_area] = compss_wait_on([my_per, my_area])
print([my_per, my_area])
Task definition detected.
Found task: addAll_2
[3060, 133600]
Scattering data from a collection
[18]:
scale_all(my_shapes, 2)
scaled_areas = {}
for key, value in my_shapes.items():
scaled_areas[key] = value.area()
scaled_areas = compss_wait_on(scaled_areas)
print(scaled_areas)
Task definition detected.
Found task: scale_all
{'rectangle': 18000, 'square': 10000, 'long_rectangle': 4000, 'small_rectangle': 2400, 'new_shape': 500000}
Stop the runtime
[19]:
ipycompss.stop(sync=True)
********************************************************
***************** STOPPING PyCOMPSs ********************
********************************************************
Checking if any issue happened.
Synchronizing all future objects left on the user scope.
********************************************************