Merge pull request #25 from ltworf/mypy

Mypy
This commit is contained in:
Salvo 'LtWorf' Tomaselli 2020-08-12 19:20:00 +02:00 committed by GitHub
commit 8f46eae9f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 173 additions and 178 deletions

View File

@ -8,4 +8,5 @@ install:
- pip install xtermcolor
script:
- make mypy
- make test

View File

@ -15,6 +15,9 @@ relational_gui/rel_edit.py:
relational_gui/resources.py:
pyrcc5 relational_gui/resources.qrc > relational_gui/resources.py
.PHONY: mypy
mypy:
mypy relational
.PHONY: test
test:

View File

@ -30,29 +30,19 @@
from io import StringIO
from tokenize import generate_tokens
from typing import Tuple, Dict
from typing import Tuple, Dict, List
from relational.relation import Relation
from relational import parser
from relational.parser import Binary, Unary, Node, PRODUCT, \
DIFFERENCE, UNION, INTERSECTION, DIVISION, JOIN, \
JOIN_LEFT, JOIN_RIGHT, JOIN_FULL, PROJECTION, \
SELECTION, RENAME, ARROW
sel_op = (
'//=', '**=', 'and', 'not', 'in', '//', '**', '<<', '>>', '==', '!=', '>=', '<=', '+=', '-=',
'*=', '/=', '%=', 'or', '+', '-', '*', '/', '&', '|', '^', '~', '<', '>', '%', '=', '(', ')', ',', '[', ']')
PRODUCT = parser.PRODUCT
DIFFERENCE = parser.DIFFERENCE
UNION = parser.UNION
INTERSECTION = parser.INTERSECTION
DIVISION = parser.DIVISION
JOIN = parser.JOIN
JOIN_LEFT = parser.JOIN_LEFT
JOIN_RIGHT = parser.JOIN_RIGHT
JOIN_FULL = parser.JOIN_FULL
PROJECTION = parser.PROJECTION
SELECTION = parser.SELECTION
RENAME = parser.RENAME
ARROW = parser.ARROW
def find_duplicates(node, dups=None):
'''
@ -64,41 +54,6 @@ def find_duplicates(node, dups=None):
dups[str(node)] = node
def replace_leaves(node, context):
'''
Node is a parsed tree
context is a dictionary containing
parsed trees as values.
If a name appearing in node appears
also in context, the parse tree is
modified to replace the node with the
subtree found in context.
'''
if node.kind == parser.UNARY:
replace_leaves(node.child, context)
elif node.kind == parser.BINARY:
replace_leaves(node.left, context)
replace_leaves(node.right, context)
elif node.name in context:
replace_node(node, context[node.name])
def replace_node(replace, replacement):
'''This function replaces "replace" node with the node "with",
the father of the node will now point to the with node'''
replace.name = replacement.name
replace.kind = replacement.kind
if replace.kind == parser.UNARY:
replace.child = replacement.child
replace.prop = replacement.prop
elif replace.kind == parser.BINARY:
replace.right = replacement.right
replace.left = replacement.left
def duplicated_select(n: parser.Node) -> Tuple[parser.Node, int]:
'''This function locates and deletes things like
σ a ( σ a(C)) and the ones like σ a ( σ b(C))
@ -107,7 +62,7 @@ def duplicated_select(n: parser.Node) -> Tuple[parser.Node, int]:
in and
'''
changes = 0
while n.name == SELECTION and n.child.name == SELECTION:
while isinstance(n, Unary) and n.name == SELECTION and isinstance(n.child, Unary) and n.child.name == SELECTION:
changes += 1
prop = n.prop
@ -117,7 +72,7 @@ def duplicated_select(n: parser.Node) -> Tuple[parser.Node, int]:
# This adds parenthesis if they are needed
if n.child.prop.startswith('(') or n.prop.startswith('('):
prop = '(%s)' % prop
n = parser.Unary(
n = Unary(
SELECTION,
prop,
n.child.child,
@ -136,38 +91,45 @@ def futile_union_intersection_subtraction(n: parser.Node) -> Tuple[parser.Node,
σ k (R) R --> σ k (R)
'''
changes = 0
if not isinstance(n, Binary):
return n, 0
# Union and intersection of the same thing
if n.name in (UNION, INTERSECTION, JOIN, JOIN_LEFT, JOIN_RIGHT, JOIN_FULL) and n.left == n.right:
return n.left, 1
# selection and union of the same thing
elif (n.name == UNION):
if n.left.name == SELECTION and n.left.child == n.right:
elif n.name == UNION:
if n.left.name == SELECTION and isinstance(n.left, Unary) and n.left.child == n.right:
return n.right, 1
elif n.right.name == SELECTION and n.right.child == n.left:
elif n.right.name == SELECTION and isinstance(n.right, Unary) and n.right.child == n.left:
return n.left, 1
# selection and intersection of the same thing
elif n.name == INTERSECTION:
if n.left.name == SELECTION and n.left.child == n.right:
if n.left.name == SELECTION and isinstance(n.left, Unary) and n.left.child == n.right:
return n.left, 1
elif n.right.name == SELECTION and n.right.child == n.left:
elif n.right.name == SELECTION and \
isinstance(n.right, Unary) and \
n.right.child == n.left:
return n.right, 1
# Subtraction and selection of the same thing
elif n.name == DIFFERENCE and \
isinstance(n, Binary) and \
n.right.name == SELECTION and \
isinstance(n.right, Unary) and \
n.right.child == n.left:
return parser.Unary(
return Unary(
SELECTION,
'(not (%s))' % n.right.prop,
n.right.child), 1
# Subtraction of the same thing or with selection on the left child
elif n.name == DIFFERENCE and (n.left == n.right or (n.left.name == SELECTION and n.left.child == n.right)):
return parser.Unary(
elif n.name == DIFFERENCE and \
isinstance(n, Binary) and \
(n.left == n.right or (n.left.name == SELECTION and isinstance(n.left, Unary) and n.left.child == n.right)):
return Unary(
SELECTION,
'False',
n.get_left_leaf()
@ -182,11 +144,12 @@ def down_to_unions_subtractions_intersections(n: parser.Node) -> Tuple[parser.No
'''
changes = 0
_o = (UNION, DIFFERENCE, INTERSECTION)
if n.name == SELECTION and n.child.name in _o:
l = parser.Unary(SELECTION, n.prop, n.child.left)
r = parser.Unary(SELECTION, n.prop, n.child.right)
if isinstance(n, Unary) and n.name == SELECTION and n.child.name in _o:
assert isinstance(n.child, Binary)
l = Unary(SELECTION, n.prop, n.child.left)
r = Unary(SELECTION, n.prop, n.child.right)
return parser.Binary(n.child.name, l, r), 1
return Binary(n.child.name, l, r), 1
return n, 0
@ -194,8 +157,8 @@ def duplicated_projection(n: parser.Node) -> Tuple[parser.Node, int]:
'''This function locates thing like π i ( π j (R)) and replaces
them with π i (R)'''
if n.name == PROJECTION and n.child.name == PROJECTION:
return parser.Unary(
if isinstance(n, Unary) and n.name == PROJECTION and isinstance(n.child, Unary) and n.child.name == PROJECTION:
return Unary(
PROJECTION,
n.prop,
n.child.child), 1
@ -205,14 +168,14 @@ def duplicated_projection(n: parser.Node) -> Tuple[parser.Node, int]:
def selection_inside_projection(n: parser.Node) -> Tuple[parser.Node, int]:
'''This function locates things like σ j (π k(R)) and
converts them into π k(σ j (R))'''
if n.name == SELECTION and n.child.name == PROJECTION:
child = parser.Unary(
if isinstance(n, Unary) and n.name == SELECTION and isinstance(n.child, Unary) and n.child.name == PROJECTION:
child = Unary(
SELECTION,
n.prop,
n.child.child
)
return parser.Unary(PROJECTION, n.child.prop, child), 0
return Unary(PROJECTION, n.child.prop, child), 0
return n, 0
@ -222,12 +185,17 @@ def swap_union_renames(n: parser.Node) -> Tuple[parser.Node, int]:
and replaces them with
ρ ab(R Q).
Does the same with subtraction and intersection'''
if n.name in (DIFFERENCE, UNION, INTERSECTION) and n.left.name == RENAME and n.right.name == RENAME:
if n.name in (DIFFERENCE, UNION, INTERSECTION) and \
isinstance(n, Binary) and \
n.left.name == RENAME and \
isinstance(n.left, Unary) and\
n.right.name == RENAME and \
isinstance(n.right, Unary):
l_vars = n.left.get_rename_prop()
r_vars = n.right.get_rename_prop()
if r_vars == l_vars:
child = parser.Binary(n.name, n.left.child, n.right.child)
return parser.Unary(RENAME, n.left.prop, child), 1
child = Binary(n.name, n.left.child, n.right.child)
return Unary(RENAME, n.left.prop, child), 1
return n, 0
@ -239,7 +207,7 @@ def futile_renames(n: parser.Node) -> Tuple[parser.Node, int]:
or removes the operation entirely if they all get removed
'''
if n.name == RENAME:
if isinstance(n, Unary) and n.name == RENAME:
renames = n.get_rename_prop()
changes = False
for k, v in renames.items():
@ -261,11 +229,14 @@ def subsequent_renames(n: parser.Node) -> Tuple[parser.Node, int]:
into
ρ ... (A)
'''
if n.name == RENAME and n.child.name == RENAME:
if isinstance(n, Unary) and \
n.name == RENAME and \
isinstance(n.child, Unary) and \
n.child.name == RENAME:
# Located two nested renames.
prop = n.prop + ',' + n.child.prop
child = n.child.child
n = parser.Unary(RENAME, prop, child)
n = Unary(RENAME, prop, child)
# Creating a dictionary with the attributes
renames = n.get_rename_prop()
@ -292,11 +263,11 @@ def subsequent_renames(n: parser.Node) -> Tuple[parser.Node, int]:
return n, 0
class level_string(str):
class LevelString(str):
level = 0
def tokenize_select(expression):
def tokenize_select(expression: str) -> List[LevelString]:
'''This function returns the list of tokens present in a
selection. The expression can contain parenthesis.
It will use a subclass of str with the attribute level, which
@ -304,8 +275,6 @@ def tokenize_select(expression):
g = generate_tokens(StringIO(str(expression)).readline)
l = list(token[1] for token in g)
l.remove('')
# Changes the 'a','.','method' token group into a single 'a.method' token
try:
while True:
@ -316,17 +285,21 @@ def tokenize_select(expression):
except:
pass
r = []
level = 0
for i in range(len(l)):
l[i] = level_string(l[i])
l[i].level = level
for i in l:
if not i:
continue
value = LevelString(i)
value.level = level
if l[i] == '(':
if value == '(':
level += 1
elif l[i] == ')':
elif value == ')':
level -= 1
r.append(value)
return l
return r
def swap_rename_projection(n: parser.Node) -> Tuple[parser.Node, int]:
@ -340,7 +313,10 @@ def swap_rename_projection(n: parser.Node) -> Tuple[parser.Node, int]:
Will also eliminate fields in the rename that are cut in the projection.
'''
if n.name == PROJECTION and n.child.name == RENAME:
if isinstance(n, Unary) and \
n.name == PROJECTION and \
isinstance(n.child, Unary) and \
n.child.name == RENAME:
# π index,name(ρ id➡index(R))
renames = n.child.get_rename_prop()
projections = set(n.get_projection_prop())
@ -356,16 +332,16 @@ def swap_rename_projection(n: parser.Node) -> Tuple[parser.Node, int]:
if i not in projections:
del renames[i]
child = parser.Unary(PROJECTION,'' , n.child.child)
child.set_projection_prop(projections)
n = parser.Unary(RENAME, '', child)
child = Unary(PROJECTION,'' , n.child.child)
child.set_projection_prop(list(projections))
n = Unary(RENAME, '', child)
n.set_rename_prop(renames)
return n, 1
return n, 0
def swap_rename_select(n: parser.Node) -> int:
def swap_rename_select(n: parser.Node) -> Tuple[parser.Node, int]:
'''This function locates things like
σ k(ρ j(R))
and replaces them with
@ -373,7 +349,10 @@ def swap_rename_select(n: parser.Node) -> int:
Renaming the attributes used in the
selection, so the operation is still valid.'''
if n.name == SELECTION and n.child.name == RENAME:
if isinstance(n, Unary) and \
n.name == SELECTION and \
isinstance(n.child, Unary) and \
n.child.name == RENAME:
# This is an inverse mapping for the rename
renames = {v: k for k, v in n.child.get_rename_prop().items()}
@ -384,26 +363,28 @@ def swap_rename_select(n: parser.Node) -> int:
for i in range(len(tokens)):
splitted = tokens[i].split('.', 1)
if splitted[0] in renames:
tokens[i] = renames[splitted[0]]
tokens[i] = LevelString(renames[splitted[0]])
if len(splitted) > 1:
tokens[i] += '.' + splitted[1]
tokens[i] = LevelString(tokens[i] + '.' + splitted[1])
child = parser.Unary(SELECTION, ' '.join(tokens), n.child.child)
return parser.Unary(RENAME, n.child.prop, child), 1
child = Unary(SELECTION, ' '.join(tokens), n.child.child)
return Unary(RENAME, n.child.prop, child), 1
return n, 0
def select_union_intersect_subtract(n: parser.Node) -> int:
def select_union_intersect_subtract(n: parser.Node) -> Tuple[parser.Node, int]:
'''This function locates things like
σ i(a) σ q(a)
and replaces them with
σ (i OR q) (a)
Removing a O() operation like the union'''
if n.name in {UNION, INTERSECTION, DIFFERENCE} and \
if isinstance(n, Binary) and \
n.name in {UNION, INTERSECTION, DIFFERENCE} and \
isinstance(n.left, Unary) and \
n.left.name == SELECTION and \
isinstance(n.right, Unary) and \
n.right.name == SELECTION and \
n.left.child == n.right.child:
d = {UNION: 'or', INTERSECTION: 'and', DIFFERENCE: 'and not'}
op = d[n.name]
@ -423,7 +404,7 @@ def select_union_intersect_subtract(n: parser.Node) -> int:
prop = t_str % (n.left.prop, op, n.right.prop)
else:
prop = '%s %s %s' % (n.left.prop, op, n.right.prop)
return parser.Unary(SELECTION, prop, n.left.child), 1
return Unary(SELECTION, prop, n.left.child), 1
return n, 0
@ -432,18 +413,23 @@ def union_and_product(n: parser.Node) -> Tuple[parser.Node, int]:
A * B A * C = A * (B C)
Same thing with inner join
'''
if n.name == UNION and n.left.name in {PRODUCT, JOIN} and n.left.name == n.right.name:
if isinstance(n, Binary) and \
n.name == UNION and \
isinstance(n.left, Binary) and \
n.left.name in {PRODUCT, JOIN} and \
isinstance(n.right, Binary) and \
n.left.name == n.right.name:
if n.left.left == n.right.left or n.left.left == n.right.right:
l = n.left.right
r = n.right.left if n.left.left == n.right.right else n.right.right
newchild = parser.Binary(UNION, l, r)
return parser.Binary(n.left.name, n.left.left, newchild), 1
newchild = Binary(UNION, l, r)
return Binary(n.left.name, n.left.left, newchild), 1
elif n.left.right == n.right.left or n.left.left == n.right.right:
l = n.left.left
r = n.right.left if n.right.left == n.right.right else n.right.right
newchild = parser.Binary(UNION, l, r)
return parser.Binary(n.left.name, n.left.right, newchild), 1
newchild = Binary(UNION, l, r)
return Binary(n.left.name, n.left.right, newchild), 1
return n, 0
@ -459,37 +445,41 @@ def projection_and_union(n: parser.Node, rels: Dict[str, Relation]) -> Tuple[par
'''
changes = 0
if n.name == UNION and \
isinstance(n, Binary) and \
n.left.name == PROJECTION and \
isinstance(n.left, Unary) and \
n.right.name == PROJECTION and \
isinstance(n.right, Unary) and \
set(n.left.child.result_format(rels)) == set(n.right.child.result_format(rels)):
child = parser.Binary(UNION, n.left.child, n.right.child)
return parser.Unary(PROJECTION, n.right.prop, child), 0
child = Binary(UNION, n.left.child, n.right.child)
return Unary(PROJECTION, n.right.prop, child), 0
return n, 0
def selection_and_product(n: parser.Node, rels: Dict[str, Relation]) -> parser.Node:
def selection_and_product(n: parser.Node, rels: Dict[str, Relation]) -> Tuple[parser.Node, int]:
'''This function locates things like σ k (R*Q) and converts them into
σ l (σ j (R) * σ i (Q)). Where j contains only attributes belonging to R,
i contains attributes belonging to Q and l contains attributes belonging to both'''
if n.name == SELECTION and n.child.name in (PRODUCT, JOIN):
if isinstance(n, Unary) and n.name == SELECTION and \
isinstance(n.child, Binary) and \
n.child.name in (PRODUCT, JOIN):
l_attr = n.child.left.result_format(rels)
r_attr = n.child.right.result_format(rels)
tokens = tokenize_select(n.prop)
groups = []
temp = []
groups: List[List[LevelString]] = []
temp: List[LevelString] = []
for i in tokens:
if i == 'and' and i.level == 0:
for k in tokens:
if k == 'and' and k.level == 0:
groups.append(temp)
temp = []
else:
temp.append(i)
if len(temp) != 0:
temp.append(k)
if len(temp):
groups.append(temp)
temp = []
del temp
left = []
right = []
@ -500,10 +490,10 @@ def selection_and_product(n: parser.Node, rels: Dict[str, Relation]) -> parser.N
r_fields = False # has fields in left?
for j in set(i).difference(sel_op):
j = j.split('.')[0]
if j in l_attr: # Field in left
t = j.split('.')[0]
if t in l_attr: # Field in left
l_fields = True
if j in r_attr: # Field in right
if t in r_attr: # Field in right
r_fields = True
if l_fields and not r_fields:
@ -518,7 +508,7 @@ def selection_and_product(n: parser.Node, rels: Dict[str, Relation]) -> parser.N
l_prop = ' and '.join((' '.join(i) for i in left))
if '(' in l_prop:
l_prop = '(%s)' % l_prop
l_node = parser.Unary(SELECTION, l_prop, n.child.left)
l_node: Node = Unary(SELECTION, l_prop, n.child.left)
else:
l_node = n.child.left
@ -527,18 +517,18 @@ def selection_and_product(n: parser.Node, rels: Dict[str, Relation]) -> parser.N
r_prop = ' and '.join((' '.join(i) for i in right))
if '(' in r_prop:
r_prop = '(%s)' % r_prop
r_node = parser.Unary(SELECTION, r_prop, n.child.right)
r_node: Node = Unary(SELECTION, r_prop, n.child.right)
else:
r_node = n.child.right
b_node = parser.Binary(n.child.name, l_node, r_node)
b_node = Binary(n.child.name, l_node, r_node)
# Changing main selection
if both:
both_prop = ' and '.join((' '.join(i) for i in both))
if '(' in both_prop:
both_prop = '(%s)' % both_prop
r = parser.Unary(SELECTION, both_prop, b_node)
r = Unary(SELECTION, both_prop, b_node)
return r, len(left) + len(right)
else: # No need for general select
return b_node, 1
@ -550,7 +540,7 @@ def useless_projection(n: parser.Node, rels: Dict[str, Relation]) -> Tuple[parse
'''
Removes projections that are over all the fields
'''
if n.name == PROJECTION and \
if isinstance(n, Unary) and n.name == PROJECTION and \
set(n.child.result_format(rels)) == set(i.strip() for i in n.prop.split(',')):
return n.child, 1

View File

@ -36,6 +36,7 @@ def optimize_program(code, rels: Dict[str, Relation]):
Optimize an entire program, composed by multiple expressions
and assignments.
'''
raise NotImplementedError()
lines = code.split('\n')
context = {}
@ -71,24 +72,19 @@ def optimize_all(expression: Union[str, Node], rels: Dict[str, Relation], specif
else:
raise (TypeError("expression must be a string or a node"))
if isinstance(debug, list):
dbg = True
else:
dbg = False
total = 1
while total != 0:
total = 0
if specific:
for i in optimizations.specific_optimizations:
n, c = recursive_scan(i, n, rels)
if c != 0 and dbg:
if c != 0 and isinstance(debug, list):
debug.append(str(n))
total += c
if general:
for i in optimizations.general_optimizations:
n, c = recursive_scan(i, n, None)
if c != 0 and dbg:
for j in optimizations.general_optimizations:
n, c = recursive_scan(j, n, None)
if c != 0 and isinstance(debug, list):
debug.append(str(n))
total += c
if tostr:

View File

@ -24,7 +24,7 @@
#
# Language definition here:
# http://ltworf.github.io/relational/grammar.html
from typing import Optional, Union, List, Any, Dict
from typing import Optional, Union, List, Any, Dict, Literal
from dataclasses import dataclass
from relational import rtypes
@ -43,6 +43,7 @@ SELECTION = 'σ'
RENAME = 'ρ'
ARROW = ''
b_operators = (PRODUCT, DIFFERENCE, UNION, INTERSECTION, DIVISION,
JOIN, JOIN_LEFT, JOIN_RIGHT, JOIN_FULL) # List of binary operators
u_operators = (PROJECTION, SELECTION, RENAME) # List of unary operators
@ -142,29 +143,28 @@ class Node:
if isinstance(self, Variable): #FIXME this is ugly
return list(rels[self.name].header)
elif isinstance(self, Binary) and self.name in (DIFFERENCE, UNION, INTERSECTION):
elif isinstance(self, Binary):
if self.name in (DIFFERENCE, UNION, INTERSECTION):
return self.left.result_format(rels)
elif isinstance(self, Binary) and self.name == DIVISION:
elif self.name == DIVISION:
return list(set(self.left.result_format(rels)) - set(self.right.result_format(rels)))
elif self.name == PROJECTION:
return self.get_projection_prop()
elif self.name == PRODUCT:
return self.left.result_format(rels) + self.right.result_format(rels)
elif self.name in (JOIN, JOIN_LEFT, JOIN_RIGHT, JOIN_FULL):
return list(set(self.left.result_format(rels)).union(set(self.right.result_format(rels))))
elif isinstance(self, Unary):
if self.name == PROJECTION:
return self.get_projection_prop()
elif self.name == SELECTION:
return self.child.result_format(rels)
elif self.name == RENAME:
_vars = {}
for i in self.prop.split(','):
q = i.split(ARROW)
_vars[q[0].strip()] = q[1].strip()
_vars = self.get_rename_prop()
_fields = self.child.result_format(rels)
for i in range(len(_fields)):
if _fields[i] in _vars:
_fields[i] = _vars[_fields[i]]
return _fields
elif self.name in (JOIN, JOIN_LEFT, JOIN_RIGHT, JOIN_FULL):
return list(set(self.left.result_format(rels)).union(set(self.right.result_format(rels))))
raise ValueError('What kind of alien object is this?')
def __eq__(self, other): #FIXME
@ -194,6 +194,7 @@ class Variable(Node):
@dataclass
class Binary(Node):
name: str
left: Node
right: Node
@ -214,6 +215,7 @@ class Binary(Node):
@dataclass
class Unary(Node):
name: str
prop: str
child: Node
@ -281,6 +283,7 @@ def parse_tokens(expression: List[Union[list, str]]) -> Node:
# The list contains only 1 string. Means it is the name of a relation
if len(expression) == 1:
assert isinstance(expression[0], str)
if not rtypes.is_valid_relation_name(expression[0]):
raise ParserException(
f'{expression[0]!r} is not a valid relation name')
@ -306,7 +309,7 @@ def parse_tokens(expression: List[Union[list, str]]) -> Node:
if len(expression[i + 1:]) == 0:
raise ParserException(
f'Expected right operand for {expression[i]!r}')
return Binary(expression[i], parse_tokens(expression[:i]), parse_tokens(expression[i + 1:]))
return Binary(expression[i], parse_tokens(expression[:i]), parse_tokens(expression[i + 1:])) # type: ignore
'''Searches for unary operators, parsing from right to left'''
for i in range(len(expression) - 1, -1, -1):
if expression[i] in u_operators: # Unary operator
@ -315,9 +318,9 @@ def parse_tokens(expression: List[Union[list, str]]) -> Node:
f'Expected more tokens in {expression[i]!r}')
return Unary(
expression[i],
prop=expression[1 + i].strip(),
child=parse_tokens(expression[2 + i])
expression[i], # type: ignore
prop=expression[1 + i].strip(), # type: ignore
child=parse_tokens(expression[2 + i]) # type: ignore
)
raise ParserException(f'Parse error on {expression!r}')

View File

@ -22,12 +22,13 @@
import csv
from itertools import chain, repeat
from collections import deque
from typing import List, Union, Set
from typing import *
from pathlib import Path
from relational.rtypes import *
class Relation (object):
class Relation:
'''
This object defines a relation (as a group of consistent tuples) and operations.
@ -51,13 +52,14 @@ class Relation (object):
An empty relation needs a header, and can be filled using the insert()
method.
'''
__hash__ = None # type: None
def __hash__(self):
raise NotImplementedError()
def __init__(self, filename : str = '') -> None:
def __init__(self, filename: Optional[Union[str, Path]] = None) -> None:
self._readonly = False
self.content = set() # type: Set[tuple]
self.content: Set[tuple] = set()
if len(filename) == 0: # Empty relation
if filename is None: # Empty relation
self.header = Header([])
return
with open(filename) as fp:
@ -73,7 +75,7 @@ class Relation (object):
self._readonly = True
copy._readonly = True
def _make_writable(self, copy_content : bool = True) -> None:
def _make_writable(self, copy_content: bool = True) -> None:
'''If this relation is marked as readonly, this
method will copy the content to make it writable too
@ -92,7 +94,7 @@ class Relation (object):
def __contains__(self, key):
return key in self.content
def save(self, filename: str) -> None:
def save(self, filename: Union[Path, str]) -> None:
'''
Saves the relation in a file. Will save using the csv
format as defined in RFC4180.
@ -169,7 +171,7 @@ class Relation (object):
newt.content.add(i + j)
return newt
def projection(self, * attributes) -> 'Relation':
def projection(self, *attributes) -> 'Relation':
'''
Can be called in two different ways:
a.projection('field1','field2')
@ -200,7 +202,7 @@ class Relation (object):
newt.content.add(tuple(row))
return newt
def rename(self, params: 'Relation') -> 'Relation':
def rename(self, params: Dict[str, str]) -> 'Relation':
'''
Takes a dictionary.
@ -505,7 +507,7 @@ class Header(tuple):
def __repr__(self):
return "Header(%s)" % super(Header, self).__repr__()
def rename(self, params) -> 'Header':
def rename(self, params: Dict[str, str]) -> 'Header':
'''Returns a new header, with renamed fields.
params is a dictionary of {old:new} names
@ -525,15 +527,15 @@ class Header(tuple):
'''Returns how many attributes this header has in common with a given one'''
return len(set(self).intersection(set(other)))
def union(self, other) -> set:
def union(self, other: 'Header') -> Set[str]:
'''Returns the union of the sets of attributes with another header.'''
return set(self).union(set(other))
def intersection(self, other) -> set:
def intersection(self, other: 'Header') -> Set[str]:
'''Returns the set of common attributes with another header.'''
return set(self).intersection(set(other))
def getAttributesId(self, param) -> List[int]:
def getAttributesId(self, param: Iterable[str]) -> List[int]:
'''Returns a list with numeric index corresponding to field's name'''
try:
return [self.index(i) for i in param]