2.1. Sort by Key
Algorithm that sorts the elements of a set of files and merges the partial results respecting the order.
First of all - Create a dataset
This step can be avoided if the dataset already exists.
If not, this code snipped creates a set of files with dictionary on each one generated randomly. Uses pickle.
[1]:
def datasetGenerator(directory, numFiles, numPairs):
import random
import pickle
import os
if os.path.exists(directory):
print("Dataset directory already exists... Removing")
import shutil
shutil.rmtree(directory)
os.makedirs(directory)
for f in range(numFiles):
fragment = {}
while len(fragment) < numPairs:
fragment[random.random()] = random.randint(0, 1000)
filename = 'file_' + str(f) + '.data'
with open(directory + '/' + filename, 'wb') as fd:
pickle.dump(fragment, fd)
print('File ' + filename + ' has been created.')
[2]:
numFiles = 2
numPairs = 10
directoryName = 'mydataset'
datasetGenerator(directoryName, numFiles, numPairs)
Dataset directory already exists... Removing
File file_0.data has been created.
File file_1.data has been created.
[3]:
# Show the files that have been created
%ls -l $directoryName
total 8
-rw-r--r-- 1 javier users 133 may 18 16:29 file_0.data
-rw-r--r-- 1 javier users 134 may 18 16:29 file_1.data
Algorithm definition
[4]:
import pycompss.interactive as ipycompss
[5]:
import os
if 'BINDER_SERVICE_HOST' in os.environ:
ipycompss.start(graph=True,
project_xml='../xml/project.xml',
resources_xml='../xml/resources.xml')
else:
ipycompss.start(graph=True, monitor=1000)
******************************************************
*************** PyCOMPSs Interactive *****************
******************************************************
* .-~~-.--. _____ _______ *
* : ) |____ \ / ___ \ *
* .~ ~ -.\ /.- ~~ . ___) | | (___) | *
* > `. .' < / ___/ \____ / *
* ( .- -. ) | |___ _ / / *
* `- -.-~ `- -' ~-.- -' |_____| |_| /__/ *
* ( : ) _ _ .-: *
* ~--. : .--~ .-~ .-~ } *
* ~-.-^-.-~ \_ .~ .-~ .~ *
* \ \ ' \ '_ _ -~ *
* \`.\`. // *
* . - ~ ~-.__\`.\`-.// *
* .-~ . - ~ }~ ~ ~-.~-. *
* .' .-~ .-~ :/~-.~-./: *
* /_~_ _ . - ~ ~-.~-._ *
* ~-.< *
******************************************************
* - Starting COMPSs runtime... *
* - Log path : /home/javier/.COMPSs/InteractiveMode_17/
* - PyCOMPSs Runtime started... Have fun! *
******************************************************
[6]:
from pycompss.api.task import task
from pycompss.api.parameter import FILE_IN
[7]:
@task(returns=list, dataFile=FILE_IN)
def sortPartition(dataFile):
'''
Reads the dataFile and sorts its content which is assumed to be a dictionary {K: V}
:param path: file that contains the data
:return: a list of (K, V) pairs sorted.
'''
import pickle
import operator
with open(dataFile, 'rb') as f:
data = pickle.load(f)
# res = sorted(data, key=lambda (k, v): k, reverse=not ascending)
partition_result = sorted(data.items(), key=operator.itemgetter(0), reverse=False)
return partition_result
[8]:
@task(returns=list, priority=True)
def reducetask(a, b):
'''
Merges two partial results (lists of (K, V) pairs) respecting the order
:param a: Partial result a
:param b: Partial result b
:return: The merging result sorted
'''
partial_result = []
i = 0
j = 0
while i < len(a) and j < len(b):
if a[i] < b[j]:
partial_result.append(a[i])
i += 1
else:
partial_result.append(b[j])
j += 1
if i < len(a):
partial_result + a[i:]
elif j < len(b):
partial_result + b[j:]
return partial_result
[9]:
def merge_reduce(function, data):
import sys
if sys.version_info[0] >= 3:
import queue as Queue
else:
import Queue
q = Queue.Queue()
for i in data:
q.put(i)
while not q.empty():
x = q.get()
if not q.empty():
y = q.get()
q.put(function(x, y))
else:
return x
MAIN
Parameters (that can be configured in the following cell): * datasetPath: The path where the dataset is (default: the same as created previously).
[10]:
import os
import time
from pycompss.api.api import compss_wait_on
datasetPath = directoryName # Where the dataset is
files = []
for f in os.listdir(datasetPath):
files.append(datasetPath + '/' + f)
startTime = time.time()
partialSorted = []
for f in files:
partialSorted.append(sortPartition(f))
result = merge_reduce(reducetask, partialSorted)
result = compss_wait_on(result)
print("Elapsed Time(s)")
print(time.time() - startTime)
import pprint
pprint.pprint(result)
Found task: sortPartition
Found task: reducetask
Elapsed Time(s)
3.6193034648895264
[(0.027312894275046573, 993),
(0.07138432853012677, 426),
(0.10308291658301261, 252),
(0.10421523358827744, 356),
(0.10743720335209561, 614),
(0.19426330574322814, 89),
(0.2120037521887378, 4),
(0.21274769665858428, 680),
(0.27702759534915444, 393),
(0.29308205906959617, 789),
(0.31724024656512495, 669),
(0.42922792366256235, 700),
(0.4319642313815307, 756),
(0.46956964955534164, 707),
(0.6944486231937671, 841),
(0.708700562554975, 720),
(0.7478662969947636, 874),
(0.9589965652687729, 304),
(0.9687167493887274, 12)]
[11]:
ipycompss.stop()
******************************************************
*************** STOPPING PyCOMPSs ******************
******************************************************
Checking if any issue happened.
Warning: some of the variables used with PyCOMPSs may
have not been brought to the master.
******************************************************