diff --git a/docs/api.rst b/docs/api.rst index 2a9f345..864486e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,2 +1,66 @@ PySke API -========= \ No newline at end of file +========= + +Pyske API offer applications implemented with list and tree skeletons. +The user can use the sequential or parallel version. +The parallel version allows a faster execution time when its launched on several processors, cores or computers. + +Dot Product +----------- + +Discrete Fast Fourier Transform +------------------------------- + +K-means Clustering +------------------ + +K-means clustering is an unsupervised algorithm that aims to partition group of points in k clusters. + +K-means function +^^^^^^^^^^^^^^^^ + +.. py:module:: pyske.examples.list.k_means + +.. autofunction:: k_means + +Initialization functions +^^^^^^^^^^^^^^^^^^^^^^^^ + +This is the standard method that initializes the centroids. This method chooses the centroids in order that each point is as far as possible from the other. + +.. autofunction:: k_means_init + + +Point Interface +^^^^^^^^^^^^^^^ + +K-means algorithm takes a list of points in parameters. For now two versions implement this class, one for 2 dimension points and another for 3 dimension points. + +Point 2D class implementation: + +.. autoclass:: pyske.core.util.point_2D.Point_2D + :members: + :special-members: + :member-order: bysource + +Running Example +^^^^^^^^^^^^^^^^^^^^ + +.. argparse:: + :module: pyske.examples.list.util + :func: k_means_parser + :prog: python3 k_means_main.py + + +Maximum Prefix Sum +------------------ + +Maximum Segment Sum +------------------- + +Parallel Regular Sampling Sort +------------------------------ + +Variance Example +---------------- + diff --git a/docs/conf.py b/docs/conf.py index 8e83820..fe8f596 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -10,9 +10,9 @@ # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # -# import os -# import sys -# sys.path.insert(0, os.path.abspath('.')) +import os +import sys +sys.path.insert(0, os.path.abspath('../.')) # -- Project information ----------------------------------------------------- @@ -31,6 +31,8 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ + "sphinx.ext.autodoc", + "sphinxarg.ext" ] # Add any paths that contain templates here, relative to this directory. @@ -52,4 +54,4 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] \ No newline at end of file +html_static_path = ['_static'] diff --git a/pyske/core/array/__init__.py b/pyske/core/array/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyske/core/array/array_interface.py b/pyske/core/array/array_interface.py new file mode 100644 index 0000000..35d3a24 --- /dev/null +++ b/pyske/core/array/array_interface.py @@ -0,0 +1,211 @@ +""" +Interface for PySke array. + +Interfaces: Array2D. +""" + +from abc import ABC, abstractmethod +from enum import Enum +from typing import Callable, Generic, TypeVar, Optional + +# pylint: disable=unused-import +from pyske.core.interface import List +from pyske.core.support import parallel as parimpl + +T = TypeVar('T') # pylint: disable=invalid-name +U = TypeVar('U') # pylint: disable=invalid-name +V = TypeVar('V') # pylint: disable=invalid-name + +_PID: int = parimpl.PID +_NPROCS: int = parimpl.NPROCS +_COMM = parimpl.COMM + +class Distribution(Enum): + LINE = 'LINE' + COLUMN = 'COLUMN' + +class Array2D(ABC, Generic[T]): + """ + PySke array2d (interface) + + Static methods: + init. + + Methods: + map, reduce, distribute, + get_partition. + """ + + @abstractmethod + def __init__(self: 'Array2D[T]') -> None: + """ + Return an empty array. + """ + + @staticmethod + @abstractmethod + def init(value_at: Callable[[int, int], V], distribution: Distribution, col_size: int, + line_size: int) -> 'Array2D[V]': + """ + Return an array built using a function + + Example:: + + >>> from pyske.core.array.sarray2d import SArray2D + >>> from pyske.core.array.array_interface import Distribution + >>> number_line = 2 + >>> number_column = 2 + >>> init_function = lambda line, column: line * number_column + column + >>> SArray2D.init(init_function, Distribution.LINE, number_column, number_line) + ( 0 1 ) + ( 2 3 ) + + :param value_at: binary function + :param distribution: the distribution direction (LINE, COLUMN) + :param col_size: number of columns + :param line_size: number of lines + :return: an 2d array of the given line and column size, where for all valid line column + i, j, the value at this index is value_at(i, j) + """ + + @abstractmethod + def distribute(self: 'Array2D[T]', distribution_direction: Distribution) -> 'Array2D[T]': + """ + Copy the array while changing its distribution. + + In sequential, it just returns ``self``. In parallel, communications + are performed to meet line or column distribution. + + Examples:: + + >>> from pyske.core.array.sarray2d import SArray2D + >>> from pyske.core.array.array_interface import Distribution + >>> sarray2d = SArray2D.init(lambda i, j: 1, Distribution.LINE, col_size=2, line_size=2) + >>> sarray2d.distribute(Distribution.COLUMN) + ( 1 1 ) + ( 1 1 ) + + :param distribution_direction: the distribution direction (LINE, COLUMN) + :return: an array containing the same elements. + """ + + @abstractmethod + def map(self: 'Array2D[T]', unary_op: Callable[[T], V]) -> 'Array2D[V]': + """ + Apply a function to all the elements. + + The returned array has the same shape (same size, same distribution) + than the initial array. + + Examples:: + + >>> from pyske.core.array.sarray2d import SArray2D + >>> from pyske.core.array.parray2d import PArray2D + >>> from pyske.core.array.array_interface import Distribution + >>> col_size = 2 + >>> line_size = 2 + >>> SArray2D.init(lambda i, j: 1, Distribution.LINE, col_size, line_size).map(lambda x: x + 1) + ( 2 2 ) + ( 2 2 ) + >>> parray2d = PArray2D.init(lambda i, j: 1, Distribution.LINE, col_size=2, line_size=2).map(lambda x: x + 1) + >>> parray2d.to_seq() + ( 2 2 ) + ( 2 2 ) + + :param unary_op: function to apply to elements + :return: a new array + """ + + @abstractmethod + def reduce(self: 'Array2D[T]', binary_op: Callable[[T, T], T], + neutral: Optional[T] = None) -> T: + """ + Reduce an array of value to one value. + + Examples:: + + >>> from pyske.core.array.sarray2d import SArray2D + >>> from pyske.core.array.parray2d import PArray2D + >>> from pyske.core.array.array_interface import Distribution + >>> parray2d = PArray2D.init(lambda i, j: 1, Distribution.COLUMN, col_size=2, line_size=2) + >>> parray2d.reduce(lambda x, y: x + y) + 4 + >>> SArray2D().reduce(lambda x, y: x + y, 0) + 0 + + :param binary_op: a binary associative and commutative operation + :param neutral: (optional): + a value that should be a neutral element for the operation, + i.e. for all element e, + ``binary_op(neutral, e) == binary_op(e, neutral) == e``. + If this argument is omitted the list should not be empty. + :return: a value + """ + + @abstractmethod + def get_partition(self: 'Array2D[T]') -> 'List[Array2D[T]]': + """ + Make the distribution visible. + + Examples:: + + >>> from pyske.core.array.sarray2d import SArray2D + >>> from pyske.core.array.parray2d import PArray2D + >>> from pyske.core.array.array_interface import Distribution + >>> from pyske.core.util import par + >>> col_size = 2 + >>> line_size = 2 + >>> init_function = lambda line, column: line * col_size + column + >>> SArray2D.init(init_function, Distribution.LINE, col_size, line_size).get_partition() + [( 0 1 ) + ( 2 3 )] + >>> parray2d = PArray2D.init(init_function, Distribution.LINE, col_size=2, line_size=2) + >>> parray2d.get_partition().to_seq() if par.procs() == [0, 1] else '[( 0 1 ), ( 2 3 )]' + '[( 0 1 ), ( 2 3 )]' + + :return: a list of array. + """ + + @abstractmethod + def map2(self: 'Array2D[T]', binary_op: Callable[[T, U], V], + a_array: 'Array2D[U]') -> 'Array2D[V]': + """ + Apply a function to all the elements of ``self`` and an array. + + The returned array has the same shape (same size, same distribution) + than the initial arrays. + + Examples:: + + >>> from pyske.core.array.sarray2d import SArray2D + >>> from pyske.core.array.array_interface import Distribution + >>> sarray2d = SArray2D.init(lambda line, column: 1, Distribution.LINE, col_size = 2, line_size = 2) + >>> sarray2d.map2(lambda x, y: x + y, sarray2d) + ( 2 2 ) + ( 2 2 ) + + :param binary_op: function to apply to each pair of elements + :param a_array: the second array. + The second array must have same column and line size than `self`. + :return: a new array. + """ + + @abstractmethod + def to_seq(self: 'Array2D[T]') -> 'Array2D[T]': + """ + Return a sequential array with same content. + + Examples:: + + >>> from pyske.core.array.sarray2d import SArray2D + >>> from pyske.core.array.parray2d import PArray2D + >>> from pyske.core.array.array_interface import Distribution + >>> PArray2D.init(lambda i, j: 1, Distribution.COLUMN, col_size=2, line_size=2).to_seq() + ( 1 1 ) + ( 1 1 ) + >>> SArray2D.init(lambda line, column: 1, Distribution.LINE, col_size = 2, line_size = 2).to_seq() + ( 1 1 ) + ( 1 1 ) + + :return: a sequential array. + """ diff --git a/pyske/core/array/parray2d.py b/pyske/core/array/parray2d.py new file mode 100644 index 0000000..28e88c0 --- /dev/null +++ b/pyske/core/array/parray2d.py @@ -0,0 +1,199 @@ +""" +A module of parallel arrays and associated skeletons + +class PArray2D: parallel arrays. +""" +from typing import Callable, TypeVar, Generic, Optional +from enum import Enum + +from pyske.core import SList, PList +from pyske.core.array import array_interface +from pyske.core.array.array_interface import Distribution +from pyske.core.array.sarray2d import SArray2D +from pyske.core.support import parallel as parimpl + +_PID: int = parimpl.PID +_NPROCS: int = parimpl.NPROCS +_COMM = parimpl.COMM + +T = TypeVar('T') # pylint: disable=invalid-name +U = TypeVar('U') # pylint: disable=invalid-name +V = TypeVar('V') # pylint: disable=invalid-name + + +def _local_index(distribution: Enum, col_size: int, line_size: int, pid: int): + local_sizes = SList([]) + for i in range(_NPROCS): + if distribution == Distribution.LINE: + local_sizes.append(parimpl.local_size(i, line_size)) + else: + local_sizes.append(parimpl.local_size(i, col_size)) + start_indexes = local_sizes.scanl(lambda x, y: x + y, 0) + if distribution == Distribution.LINE: + return (start_indexes[pid], start_indexes[pid] + local_sizes[pid] - 1), (0, col_size - 1) + return (0, line_size - 1), (start_indexes[pid], start_indexes[pid] + local_sizes[pid] - 1) + + +class PArray2D(array_interface.Array2D, Generic[T]): + # pylint: disable=protected-access + """ + Distributed arrays + + Static methods from interface IArray: + init. + + Methods from interface IArray: + map,reduce, + get_partition,distribute + """ + + def __init__(self: 'PArray2D[T]'): + super().__init__() + self.__global_index = ((-1, -1), (-1, -1)) + self.__local_index = ((-1, -1), (-1, -1)) + self.__content = SArray2D([], -1, -1) + self.__distribution = [((-1, -1), (-1, -1)) for _ in range(0, _NPROCS)] + self.__distribution_direction = Distribution.LINE + + def __get_shape(self: 'PArray2D[T]') -> 'PArray2D': + p_array2d = PArray2D() + p_array2d.__global_index = self.__global_index + p_array2d.__local_index = self.__local_index + p_array2d.__distribution = self.__distribution + p_array2d.__distribution_direction = self.__distribution_direction + return p_array2d + + def __str__(self: 'PArray2D[T]') -> str: + return "PID[" + str(_PID) + "]:\n" + \ + " global_index: " + str(self.__global_index) + "\n" + \ + " local_index: " + str(self.__local_index) + "\n" + \ + " distribution: " + str(self.__distribution) + "\n" + \ + " content: \n" + str(self.__content) + "\n" + + @staticmethod + def init(value_at: Callable[[int, int], V], distribution: Distribution, + col_size: int = _NPROCS, + line_size: int = _NPROCS) -> 'PArray2D[V]': + assert _NPROCS <= col_size + assert _NPROCS <= line_size + + parray2d = PArray2D() + parray2d.__global_index = ((0, line_size - 1), (0, col_size - 1)) + + parray2d.__local_index = _local_index(distribution, col_size, line_size, _PID) + + content = [] + for line in range(parray2d.__local_index[0][0], parray2d.__local_index[0][1] + 1): + for column in range(parray2d.__local_index[1][0], parray2d.__local_index[1][1] + 1): + content.append(value_at(line, column)) + local_line_size = parray2d.__local_index[0][1] - parray2d.__local_index[0][0] + 1 + local_col_size = parray2d.__local_index[1][1] - parray2d.__local_index[1][0] + 1 + parray2d.__content = SArray2D(content, local_line_size, local_col_size) + + parray2d.__distribution_direction = distribution + parray2d.__distribution = [ + _local_index(parray2d.__distribution_direction, col_size, line_size, i) for i in + range(_NPROCS)] + + return parray2d + + def __distribute_column(self: 'PArray2D[T]', new_parray: 'PArray2D[T]', local_line_size, + local_col_size): + col_size = self.__global_index[1][1] - self.__global_index[1][0] + 1 + # update content for each process + for i in range(0, _NPROCS): + content_to_send = [] + for j in range(len(self.__content)): + if j % col_size in range(new_parray.__distribution[i][1][0], + new_parray.__distribution[i][1][1] + 1): + content_to_send.append(self.__content.values[j]) + if i == _PID: + content = _COMM.gather(content_to_send, i) + # flatten the list + content = [item for items in content for item in items] + new_parray.__content = SArray2D(content, local_line_size, local_col_size) + else: + _COMM.gather(content_to_send, i) + + return new_parray + + def __distribute_line(self: 'PArray2D[T]', new_parray: 'PArray2D[T]', local_line_size, + local_col_size, old_distribution): + # update content for each process + for i in range(0, _NPROCS): + content = [] + old_local_col_size = old_distribution[_PID][1][1] - old_distribution[_PID][1][0] + 1 + start_index = new_parray.__distribution[i][0][0] * old_local_col_size + stop_index = (new_parray.__distribution[i][0][1] + 1) * old_local_col_size + for j in range(start_index, stop_index, old_local_col_size): + content_to_send = self.__content.values[j:j+old_local_col_size] + content_to_send = _COMM.allgather(content_to_send) + content.extend([item for items in content_to_send for item in items]) + if i == _PID: + new_parray.__content = SArray2D(content, local_line_size, local_col_size) + + return new_parray + + def distribute(self: 'PArray2D[T]', distribution_direction: Distribution) -> 'PArray2D[T]': + if distribution_direction == self.__distribution_direction: + return self + parray2d = PArray2D() + parray2d.__global_index = self.__global_index + + line_size = self.__global_index[0][1] - self.__global_index[0][0] + 1 + col_size = self.__global_index[1][1] - self.__global_index[1][0] + 1 + + old_distribution = self.__distribution + + parray2d.__local_index = _local_index(distribution_direction, col_size, line_size, _PID) + parray2d.__distribution_direction = distribution_direction + parray2d.__distribution = [ + _local_index(parray2d.__distribution_direction, col_size, line_size, i) for i in + range(_NPROCS)] + local_line_size = parray2d.__local_index[0][1] - parray2d.__local_index[0][0] + 1 + local_col_size = parray2d.__local_index[1][1] - parray2d.__local_index[1][0] + 1 + if distribution_direction == Distribution.COLUMN: + parray2d = self.__distribute_column(parray2d, local_line_size, local_col_size) + else: + parray2d = self.__distribute_line(parray2d, local_line_size, local_col_size, old_distribution) + + return parray2d + + def map(self: 'PArray2D[T]', unary_op: Callable[[T], V]) -> 'PArray2D[V]': + p_array2d = self.__get_shape() + p_array2d.__content = self.__content.map(unary_op) + return p_array2d + + def reduce(self: 'PArray2D[T]', binary_op: Callable[[T, T], T], + neutral: Optional[T] = None) -> T: + if neutral is None: + assert self.__global_index != ((-1, -1), (-1, -1)) + partial = self.__content.reduce(binary_op) + else: + partial = self.__content.reduce(binary_op, neutral) + partials = SArray2D(_COMM.allgather(partial), self.__content.line_size, + self.__content.column_size) + return partials.reduce(binary_op, neutral) + + def get_partition(self: 'PArray2D[T]') -> 'PList[SArray2D[T]]': + contents = _COMM.allgather(self.__content) + p_list = PList().init(lambda i: contents[i], _NPROCS) + return p_list + + def map2(self: 'PArray2D[T]', binary_op: Callable[[T, U], V], + a_array: 'PArray2D[U]') -> 'PArray2D[V]': + assert self.__distribution == a_array.__distribution + p_array2d = self.__get_shape() + p_array2d.__content = self.__content.map2(binary_op, a_array.__content) + return p_array2d + + def to_seq(self: 'PArray2D[T]') -> 'SArray2D[T]': + parray2d = self + if self.__distribution_direction == Distribution.COLUMN: + parray2d = self.distribute(Distribution.LINE) + col_size = parray2d.__global_index[1][1] - parray2d.__global_index[1][0] + 1 + line_size = parray2d.__global_index[0][1] - parray2d.__global_index[0][0] + 1 + content = parray2d.get_partition() \ + .reduce(lambda a_sarray, b_sarray: SArray2D.concat(a_sarray, b_sarray), + SArray2D([], 0, 0)).values + return SArray2D(content, line_size, col_size) diff --git a/pyske/core/array/sarray2d.py b/pyske/core/array/sarray2d.py new file mode 100644 index 0000000..432df3f --- /dev/null +++ b/pyske/core/array/sarray2d.py @@ -0,0 +1,110 @@ +""" +A module of sequential arrays and associated skeletons + +class SArray2D: sequential arrays. +""" +import functools +from typing import TypeVar, Generic, Callable, Optional + +# pylint: disable=unused-import +from pyske.core import SList +from pyske.core.array.array_interface import Array2D, Distribution + +T = TypeVar('T') # pylint: disable=invalid-name +U = TypeVar('U') # pylint: disable=invalid-name +V = TypeVar('V') # pylint: disable=invalid-name + + +class SArray2D(Array2D, Generic[T]): + """ + Sequential arrays + + Static methods from interface IArray: + init. + + Methods from interface IArray: + map,reduce, + get_partition,distribute + + """ + + def __init__(self, content=None, line_size=-1, col_size=-1): + super().__init__() + if content is None: + content = [] + self.__line_size = line_size + self.__column_size = col_size + self.__values = content + + @property + def values(self): + return self.__values + + @property + def line_size(self): + return self.__line_size + + @property + def column_size(self): + return self.__column_size + + def __str__(self): + content = "" + for i in range(self.__line_size): + content += "(" + for j in range(self.__column_size): + content += '%4s' % self.__values[i * self.__column_size + j] + content += '%4s' % ')' + if i != self.line_size - 1: + content += '\n' + return content + + def __repr__(self): + return str(self) + + def __len__(self): + return self.__column_size * self.__line_size + + @staticmethod + def init(value_at: Callable[[int, int], V], _: Distribution, col_size: int, + line_size: int) -> 'SArray2D[V]': + assert col_size > 0 + assert line_size > 0 + content = [] + for line in range(line_size): + for column in range(col_size): + content.append(value_at(line, column)) + sarray2d = SArray2D(content, line_size, col_size) + return sarray2d + + def map(self: 'SArray2D[T]', unary_op: Callable[[T], V]) -> 'SArray2D[V]': + content = list(map(unary_op, self.__values)) + return SArray2D(content, self.__line_size, self.__column_size) + + def reduce(self: 'SArray2D[T]', binary_op: Callable[[T, T], T], + neutral: Optional[T] = None) -> T: + if neutral is None: + return functools.reduce(binary_op, self.__values) + return functools.reduce(binary_op, self.__values, neutral) + + def get_partition(self: 'SArray2D[T]') -> 'SList[SArray2D[T]]': + return SList([self]) + + def distribute(self: 'SArray2D[T]', distribution_direction: Distribution) -> 'SArray2D[T]': + return self + + def map2(self: 'SArray2D[T]', binary_op: Callable[[T, U], V], + a_array: 'SArray2D[U]') -> 'SArray2D[V]': + assert self.__line_size == a_array.line_size + assert self.__column_size == a_array.column_size + content = [binary_op(left, right) for (left, right) in zip(self.__values, a_array.values)] + return SArray2D(content, self.__line_size, self.__column_size) + + def to_seq(self: 'SArray2D[T]') -> 'SArray2D[T]': + return self + + @staticmethod + def concat(a_sarray: 'SArray2D[T]', b_sarray: 'SArray2D[T]') -> 'SArray2D[T]': + line_size = a_sarray.line_size + b_sarray.line_size + col_size = a_sarray.column_size + return SArray2D(a_sarray.values + b_sarray.values, line_size, col_size) diff --git a/pyske/core/util/point_2D.py b/pyske/core/util/point_2D.py new file mode 100644 index 0000000..d0bfeca --- /dev/null +++ b/pyske/core/util/point_2D.py @@ -0,0 +1,91 @@ +""" +A module to represent a 2D point +""" + +from math import sqrt +from pyske.core.util.point_Interface import Point_Interface + + +class Point_2D(Point_Interface): + """A class to represent a 2D point""" + + def __init__(self, x=0, y=0): + self.__x = x + self.__y = y + + def __repr__(self): + return "(%s, %s)" % (self.__x, self.__y) + + def __eq__(self, other): + """ + Equality between two points + """ + if isinstance(other, Point_2D): + return self.__x == other.__x and self.__y == other.__y + return False + + def __add__(self, other): + """ + Addition of two points + + Examples:: + + >>> p1 = Point_2D(5,5) + >>> p2 = Point_2D(5,7) + >>> p1 + p2 + (10, 12) + """ + if isinstance(other, Point_2D): + return Point_2D(self.x + other.x, self.y + other.y) + + def __mul__(self, other): + """ + Multiplication by a point or a scalar + + Examples:: + + >>> p1 = Point_2D(5,5) + >>> p2 = Point_2D(5,7) + >>> p1 * 5 + (25, 25) + >>> p1 * p2 + (25, 35) + """ + if isinstance(other, Point_2D): + return Point_2D(self.x * other.x, self.y * other.y) + if isinstance(other, int) or isinstance(other, float): + return Point_2D(self.x * other, self.y * other) + + def __truediv__(self, other): + if isinstance(other, int): + return Point_2D(self.x / other, self.y / other) + + @property + def x(self): + """X getter""" + return self.__x + + @property + def y(self): + """Y getter""" + return self.__y + + def distance(self, other: 'Point_2D'): + """ + Returns the distance from another point. + + Examples:: + + >>> from pyske.core.util.point_2D import Point_2D + >>> p1 = Point_2D(5,5) + >>> p2 = Point_2D(5,7) + >>> p1.distance(p2) + 2.0 + + :param other: a point + :return: distance from other point + + """ + dx = self.__x - other.x + dy = self.__y - other.y + return sqrt(dx ** 2 + dy ** 2) diff --git a/pyske/core/util/point_3D.py b/pyske/core/util/point_3D.py new file mode 100644 index 0000000..678710d --- /dev/null +++ b/pyske/core/util/point_3D.py @@ -0,0 +1,95 @@ +""" +A module to represent a 3D point +""" + +from math import sqrt +from pyske.core.util.point_Interface import Point_Interface + + +class Point_3D(Point_Interface): + """A class to represent a 3D point""" + + def __init__(self, x=0, y=0, z=0): + self.__x = x + self.__y = y + self.__z = z + + def __repr__(self): + return "(%s, %s, %s)" % (self.__x, self.__y, self.__z) + + def __eq__(self, other): + if isinstance(other, Point_3D): + return self.__x == other.__x and self.__y == other.__y and self.__z == other.__z + return False + + def __add__(self, other): + """ + Addition of two points + + Examples:: + + >>> p1 = Point_3D(5,5,2) + >>> p2 = Point_3D(5,7,1) + >>> p1 + p2 + (10, 12, 3) + """ + if isinstance(other, Point_3D): + return Point_3D(self.x + other.x, self.y + other.y, self.z + other.z) + + def __mul__(self, other): + """ + Multiplication by a point or a scalar + + Examples:: + + >>> p1 = Point_3D(5,5,2) + >>> p2 = Point_3D(5,7,1) + >>> p1 * 5 + (25, 25, 10) + >>> p1 * p2 + (25, 35, 2) + """ + if isinstance(other, Point_3D): + return Point_3D(self.x * other.x, self.y * other.y, self.z * other.z) + if isinstance(other, int) or isinstance(other, float): + return Point_3D(self.x * other, self.y * other, self.z * other.z) + + def __truediv__(self, other): + if isinstance(other, int): + return Point_3D(self.x / other, self.y / other, self.z / other) + + @property + def x(self): + """X getter""" + return self.__x + + @property + def y(self): + """Y getter""" + return self.__y + + @property + def z(self): + """Z getter""" + return self.__z + + def distance(self, other): + """ + Returns the distance from another 3D point. + + Examples:: + + >>> from pyske.core.util.point_2D import Point_2D + >>> p1 = Point_3D(5,5,2) + >>> p2 = Point_3D(5,7,1) + >>> p1.distance(p2) + 2.24 + + :param other: a point + :return: distance from other point + + """ + dx = self.__x - other.x + dy = self.__y - other.y + dz = self.__z - other.z + return sqrt(dx ** 2 + dy ** 2 + dz ** 2) diff --git a/pyske/core/util/point_Interface.py b/pyske/core/util/point_Interface.py new file mode 100644 index 0000000..6196c47 --- /dev/null +++ b/pyske/core/util/point_Interface.py @@ -0,0 +1,25 @@ +""" +A module to represent a point +""" +from abc import ABC + +class Point_Interface(ABC): + """Point interface to represent point of n dimensions""" + + def __repr__(self): + pass + + def __eq__(self, other): + pass + + def __add__(self, other): + pass + + def __mul__(self, other): + pass + + def __truediv__(self, other): + pass + + def distance(self, other): + pass diff --git a/pyske/examples/array/__init__.py b/pyske/examples/array/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyske/examples/array/hello_world.py b/pyske/examples/array/hello_world.py new file mode 100644 index 0000000..7f4f4eb --- /dev/null +++ b/pyske/examples/array/hello_world.py @@ -0,0 +1,63 @@ +""" +Example: various manipulations on a parallel array +""" + +from pyske.core.array.parray2d import PArray2D +from pyske.core.array.array_interface import Distribution +from pyske.core.array.sarray2d import SArray2D + + +def __main(): + col_size = 6 + line_size = 12 + + print("Line initialization") + parray2d_line_init = PArray2D.init(lambda line, column: line * col_size + column, + Distribution.LINE, col_size, line_size) + print(parray2d_line_init) + + print("Line to column distribution") + parray2d_column = parray2d_line_init.distribute(Distribution.COLUMN) + print(parray2d_column) + + print("Column to line distribution") + parray2d_line = parray2d_column.distribute(Distribution.LINE) + print(parray2d_line) + + print("Column initialization") + parray2d_column_init = PArray2D.init(lambda line, column: line * col_size + column, + Distribution.COLUMN, col_size, line_size) + print(parray2d_column_init) + + print("Reduce Test") + print(parray2d_column_init.reduce(lambda x, y: x + y, 0)) + print(parray2d_column_init.reduce(lambda x, y: x + y)) + print(parray2d_line_init.reduce(lambda x, y: x + y, 0)) + print(parray2d_column.reduce(lambda x, y: x + y, 0)) + print(PArray2D().reduce(lambda x, y: x + y, 0)) + + print("Mapped array") + parray2d_map = parray2d_line_init.map(lambda x: x + 1) + print(parray2d_map) + + print("Sarray initialization") + sarray2d = SArray2D.init(lambda line, column: line * col_size + column, Distribution.LINE, + col_size, line_size) + print(sarray2d) + + print("Get partition") + print(parray2d_column_init.get_partition()) + + b_sarray2d = SArray2D.init(lambda line, column: 1, Distribution.LINE, col_size, line_size) + + print("Map2 array") + print(sarray2d.map2(lambda x, y: x + y, b_sarray2d)) + print(parray2d_line_init.map2(lambda x, y: x + y, parray2d_line_init)) + print(parray2d_column_init.map2(lambda x, y: x + y, parray2d_column)) + + print("To seq") + print(parray2d_column.to_seq()) + + +if __name__ == '__main__': + __main() diff --git a/pyske/examples/list/k_means.py b/pyske/examples/list/k_means.py new file mode 100644 index 0000000..a1eab32 --- /dev/null +++ b/pyske/examples/list/k_means.py @@ -0,0 +1,122 @@ +""" +K-Means +""" +import random +from typing import Callable, Tuple + +from pyske.core.interface import List +from pyske.core.list import SList +from pyske.core.util.point_Interface import Point_Interface +from pyske.core.util.par import procs + + +def cluster_index(point: Point_Interface, centroids: SList[Point_Interface]) -> \ + Tuple[Point_Interface, int]: + """ + Get the centroid index of the closest centroid + """ + min_dist = float("inf") + p_centroid = centroids[0] + for centroid in centroids: + if point.distance(centroid) < min_dist: + min_dist = point.distance(centroid) + p_centroid = centroid + return point, centroids.index(p_centroid) + + +def assign_clusters(input_list: List[Point_Interface], centroids: SList[Point_Interface]) -> \ + List[Tuple[Point_Interface, int]]: + """ + Assign each point to a cluster + """ + return input_list.map(lambda x: cluster_index(x, centroids)) + + +def update_centroids(clusters: List[Tuple[Point_Interface, int]], + centroids: SList[Point_Interface]): + """ + Update centroids of clusters + """ + + def centroids_list_update(list_to_update, item): + if isinstance(item, SList): + list_to_update = list_to_update.map2(lambda a_pair, b_pair: (a_pair[0] + b_pair[0], + a_pair[1] + b_pair[1]), + item) + else: + index = item[1] + point = item[0] + list_to_update[index] = (list_to_update[index][0] + point, + list_to_update[index][1] + 1) + return list_to_update + + point_class = type(centroids[0]) + neutral_list = SList.init(lambda _: (point_class(), 0), len(centroids)) + new_centroids = clusters.reduce(lambda a_item, b_item: + centroids_list_update(a_item, b_item), neutral_list) + new_centroids = new_centroids.map(lambda x: x[0] / x[1]) + + return new_centroids + + +def max_dist(pair_a: Tuple[Point_Interface, float], pair_b: Tuple[Point_Interface, float]): + """ + Return the tuple with the maximum distance + """ + if pair_a[1] > pair_b[1]: + return pair_a + return pair_b + + +def k_means_init(input_list: List[Point_Interface], n_cluster: int) -> SList[Point_Interface]: + """ + K-means++ initialization + + :param input_list: a list of points + :param n_cluster: number of clusters + + :return: list of centroids + """ + centroids = SList([]) + first_centroid = input_list.get_partition() \ + .map(lambda l: l[random.randint(0, l.length() - 1)]) \ + .to_seq()[random.randint(0, list(procs())[len(list(procs())) - 1])] + centroids.append(first_centroid) + + for _ in range(n_cluster - 1): + dist = input_list.map(lambda x: x.distance(centroids[0])) + for i in range(1, len(centroids)): + temp_dist = input_list.map(lambda x, index=i: x.distance(centroids[index])) + dist = dist.map2(lambda x, y: y if y < x else x, temp_dist) + + zip_list = input_list.zip(dist) + next_centroid = zip_list.reduce(max_dist)[0] + centroids.append(next_centroid) + + return centroids + + +def k_means(input_list: List[Point_Interface], init_function: Callable[[List, int], List], + n_cluster: int, + max_iter: int = 10) -> List[Tuple[Point_Interface, int]]: + """ + K-means algorithm on a list of points + + :param input_list: a list of points + :param init_function: a function that initialize centroids + :param n_cluster: number of clusters + :param max_iter: number of iterations + + :return: a list of tuples with the point and his cluster index + """ + centroids = init_function(input_list, n_cluster) + + j = 0 + while j < max_iter: + clusters = assign_clusters(input_list, centroids) + + centroids = update_centroids(clusters, centroids) + + j = j + 1 + + return clusters diff --git a/pyske/examples/list/k_means_main.py b/pyske/examples/list/k_means_main.py new file mode 100644 index 0000000..c8b7782 --- /dev/null +++ b/pyske/examples/list/k_means_main.py @@ -0,0 +1,42 @@ +""" +Execution of k_means +""" + +from pyske.core import Timing +from pyske.examples.list.k_means import k_means, k_means_init +from pyske.examples.list import util + +PAR = 'parallel' +SEQ = 'sequential' + + +if __name__ == '__main__': + + parser = util.k_means_parser() + + args = parser.parse_args() + size = args.size + num_iter = args.iter + choice = args.data + clusters = args.clusters + dimensions = args.dimensions + show_clusters = args.show_clusters + + pyske_list_class = util.select_pyske_list(choice) + input_list = util.rand_point_list(pyske_list_class, size, clusters, dimensions) + + timing = Timing() + execute = util.select_execute(choice) + example = k_means + execute(lambda: print('Version:\t', choice)) + for iteration in range(1, 1 + num_iter): + timing.start() + result = example(input_list, k_means_init, clusters) + timing.stop() + util.print_experiment("", timing.get(), execute, iteration) + if show_clusters: + if dimensions == 2: + util.print_2D_result(result.to_seq()) + if dimensions == 3: + util.print_3D_result(result.to_seq()) + diff --git a/pyske/examples/list/util.py b/pyske/examples/list/util.py index 57bed0d..4964c93 100644 --- a/pyske/examples/list/util.py +++ b/pyske/examples/list/util.py @@ -2,6 +2,17 @@ Utility functions for PySke examples """ +from typing import Tuple + +import argparse +import matplotlib.pyplot as plt + +from sklearn.datasets import make_blobs +from pyske.core import Distribution, SList +from pyske.core.support import parallel +from pyske.core.util.point_2D import Point_2D +from pyske.core.util.point_3D import Point_3D + PAR = 'parallel' SEQ = 'sequential' @@ -19,8 +30,6 @@ def standard_parse_command_line(size_arg=True, iter_arg=True, data_arg=True): :param data_arg: (default True) flag to select argument --data :return: (size, iter, ['parallel' | 'sequential']) """ - # pylint: disable=import-outside-toplevel - import argparse parser = argparse.ArgumentParser() if size_arg: parser.add_argument("--size", help="size of the list to generate", @@ -43,6 +52,20 @@ def standard_parse_command_line(size_arg=True, iter_arg=True, data_arg=True): return size, num_iter, data_type +def k_means_parser(): + """ + Parse command line for k-means example. + """ + parser = argparse.ArgumentParser() + parser.add_argument("--size", help="size of the list to generate", type=int, default=5_000) + parser.add_argument("--iter", help="number of iterations", type=int, default=30) + parser.add_argument("--data", help="type of data structure", choices=[PAR, SEQ], default=SEQ) + parser.add_argument("--clusters", help="number of clusters", type=int, default=3) + parser.add_argument("--dimensions", help="point dimensions", type=int, default=2) + parser.add_argument("--show-clusters", help="display the clusters graph of 2D or 3D points", + action="store_true") + return parser + def select_pyske_list(choice): """ Return a PySke list class. @@ -88,6 +111,71 @@ def rand_list(cls, size): import random return cls.init(lambda _: float(random.randint(-100, 100)), size) +def select_point_dimensions(dimensions): + """ + Return a PySke list class. + + :param dimensions: point dimensions + Precondition: dimensions >= 2 + :return: a Point + """ + # pylint: disable=import-outside-toplevel + if dimensions == 3: + from pyske.core.util.point_3D import Point_3D as PointClass + else: + from pyske.core.util.point_2D import Point_2D as PointClass + return PointClass + +def rand_point_list(cls, size, clusters, dimensions): + """ + Return a randomly generated list of points. + + :param cls: the class of the generated list. + :param size: a positive number + Precondition: size >= 0 + :param clusters: number of clusters + :param dimensions: point dimensions + Precondition: dimensions >= 2 + :return: a list of the given class + """ + x, _ = make_blobs(n_samples=size, centers=clusters, n_features=dimensions) + x = x.tolist() + pointclass = select_point_dimensions(dimensions) + x = list(map(lambda y: pointclass(*y), x)) + distr = Distribution().balanced(size) + return cls.from_seq(x).distribute(distr) + +def print_2D_result(clusters_list: SList[Tuple[Point_2D, int]]): + """ + Print experiment of 2 dimension points k-means clustering + """ + if parallel.PID == 0: + x = clusters_list.map(lambda pair: pair[0].x) + y = clusters_list.map(lambda pair: pair[0].y) + colors = clusters_list.map(lambda pair: pair[1]) + plt.scatter(x, y, c=colors) + plt.show() + +def print_3D_result(clusters_list: SList[Tuple[Point_3D, int]]): + """ + Print experiment of 3 dimension points k-means clustering + """ + if parallel.PID == 0: + x = clusters_list.map(lambda pair: pair[0].x) + y = clusters_list.map(lambda pair: pair[0].y) + z = clusters_list.map(lambda pair: pair[0].z) + colors = clusters_list.map(lambda pair: pair[1]) + + # Tracé du résultat en 3D + fig = plt.figure() + ax = fig.add_subplot(projection='3d') # Affichage en 3D + ax.scatter(x, y, z, label='Courbe', marker='d', c=colors) # Tracé des points 3D + plt.title("Points 3D") + ax.set_xlabel('X') + ax.set_ylabel('Y') + ax.set_zlabel('Z') + plt.tight_layout() + plt.show() def print_experiment(result, timing, execute, iteration=None): """