Commit 95919d11 authored by Mehrdad's avatar Mehrdad

added rulemall and solved initiation error

parent 5ece5fc8
from app.main import main from app.main import main
from app.main.utils.common import response_message from app.main.utils.common import response_message
from flask import request from flask import request
from rulemall import RulesGroup, Rule from app.rulemall import RulesGroup, Rule
from app.main.utils.db_util import add_obj, return_obj, rm_obj, return_all_objs, return_project_rules from app.main.utils.db_util import add_obj, return_obj, rm_obj, return_all_objs, return_project_rules
......
from app.main import main from app.main import main
from app.main.utils.common import response_message, read_data, read_rules, write_validations from app.main.utils.common import response_message, read_data, read_rules, write_validations
from rulemall import RulesGroup from app.rulemall import RulesGroup
import pandas as pd import pandas as pd
......
from app.main import main from app.main import main
from app.main.utils.common import response_message from app.main.utils.common import response_message
from flask import json from flask import json
from rulemall.validators import * from app.rulemall.validators import *
@main.route('/validators', methods=['GET', 'POST']) @main.route('/validators', methods=['GET', 'POST'])
...@@ -9,7 +9,7 @@ def validators(): ...@@ -9,7 +9,7 @@ def validators():
vals = [] vals = []
for val in [Range, Length, Category, Regex, Missing, Unique, Empty, Logic]: for val in [Range, Length, Category, Regex, Missing, Unique, Empty, Logic]:
v = val() v = val()
vals.append(json.loads(str(v))) vals.append(json.loads(v.__repr__()))
return response_message(vals) return response_message(vals)
......
from .rules import *
import pandas as pd
from app.rulemall.validators import *
import json
class RuleSet:
def __init__(self, data, rule_groups=[]):
self.data = data
self.rule_groups = rule_groups
def to_dict(self):
output = []
for rule_group in self.rule_groups:
output.append(rule_group.to_dict())
return output
def from_dict(self, rule_groups):
self.rule_groups = []
for rule_group in rule_groups:
rg = RulesGroup()
rg.from_dict(rule_group)
rg.reset_data(self.data)
self.rule_groups.append(rg)
class RulesGroup:
def __init__(self, name='', data=None, rules=None, filters=None, by=None):
self.name = name
self.data = data
self.by = by
self.rules = rules
self.filters = filters
def __add__(self, rule):
self.rules = AndOperator([self.rules, rule])
def reset_data(self, data=None):
self.data = data
def validate(self):
if self.data is not None:
data = self.data
if self.filters:
filters, _ = self.filters.validate(data)
data = data.loc[filters]
if self.rules:
return self.rules.validate(data)
return None
def get_target(self):
if self.rules:
return self.rules.get_target()
def to_dict(self):
d = {'name': self.name, 'rules': [], 'filters': []}
for rule in self.rules:
d['rules'].append(rule.to_dict())
for f in self.filters:
d['filters'].append(f.to_dict())
return d
def reset_rules(self, rules):
self.rules = None
if 'AND' in rules:
a = AndOperator(operands=[])
elif 'OR' in rules:
a = OrOperator(operands=[])
else:
try:
a = Rule()
except Exception as e:
print(e)
raise TypeError(str(e))
try:
a.from_dict(rules.copy())
except Exception as e:
print(e)
raise TypeError(str(e))
self.rules = a
def reset_filters(self, filters):
self.filters = None
if 'AND' in filters:
a = AndOperator()
elif 'OR' in filters:
a = OrOperator()
else:
a = Rule()
try:
a.from_dict(filters.copy())
self.filters = a
except Exception as e:
print(e)
raise TypeError(str(e))
def from_dict(self, dct):
self.name = dct['name']
rules = dct['rules']
filters = dct.get('filters')
self.reset_rules(rules)
try:
self.reset_filters(filters)
except Exception:
self.filters = None
# TODO: complete from config method
# TODO: Add a method to return the names of all rules withing the object
# TODO: Get rule applicator from factory of adaptor
class Rule:
def __init__(self, name=None, target=None, validator=None, invert=False, orient='column'):
self.name = name
self.target = target
self.validator = validator
self.orient = orient
self.invert = invert
self.vdf = pd.DataFrame()
def validate(self, s):
if self.orient == 'column':
if self.invert:
self.vdf[self.name] = ~self.validator.validate(s[self.target])
self.vdf['responsible'] = self.vdf[self.name].apply(lambda x: [] if x else [self.name])
else:
self.vdf[self.name] = self.validator.validate(s[self.target])
self.vdf['responsible'] = self.vdf[self.name].apply(lambda x: [] if x else [self.name])
elif self.orient == 'record':
if self.invert:
self.vdf[self.name] = ~self.validator.validate(s)
self.vdf['responsible'] = self.vdf[self.name].apply(lambda x: [] if x else [self.name])
else:
self.vdf[self.name] = self.validator.validate(s)
self.vdf['responsible'] = self.vdf[self.name].apply(lambda x: [] if x else [self.name])
return self.vdf[self.name], self.vdf['responsible']
def get_target(self):
return self.target
def to_dict(self):
d = self.validator.to_dict()
d['rule_attributes'] = {"name": self.name, "target": self.target, "orient": self.orient, "invert": self.invert}
return d
def from_dict(self, dct):
rule_type = dct['rule_type']
if rule_type == 'Range':
validator = Range(**dct['validator_parameters'])
elif rule_type == 'Regex':
validator = Regex(**dct['validator_parameters'])
elif rule_type == 'Length':
validator = Length(**dct['validator_parameters'])
elif rule_type == 'Unique':
validator = Unique(**dct['validator_parameters'])
elif rule_type == 'Category':
validator = Category(**dct['validator_parameters'])
elif rule_type == 'Compare':
validator = Compare(**dct['validator_parameters'])
elif rule_type == 'Empty':
validator = Empty()
elif rule_type == 'Logic':
validator = Logic(**dct['validator_parameters'])
else:
raise NotImplementedError
self.validator = validator
self.name = dct['rule_attributes']['name']
self.target = dct['rule_attributes']['target']
self.invert = dct['rule_attributes'].get('invert', False)
self.orient = dct['rule_attributes'].get('orient', 'column')
if isinstance(self.invert, str):
self.invert = json.loads(self.invert)
class OperatorBaseClass(metaclass=ABCMeta):
def __init__(self, operands=[]):
self.operands = operands
@staticmethod
def operation(data, other):
raise NotImplementedError
def get_target(self):
columns = []
for op in self.operands:
a = op.get_target()
if isinstance(a, list):
columns.extend(a)
else:
columns.append(a)
return columns
def validate(self, data):
if self.operands:
t, r = self.operands[0].validate(data)
for op in self.operands[1:]:
v1, v2 = op.validate(data)
r = pd.concat([r, v2.rename('r')], axis=1)
t = self.operation(t, v1)
r.loc[t, 'responsible'] = r.loc[t, 'responsible'].apply(lambda x: [])
r.loc[~t].apply(lambda x: x['responsible'].extend(x['r']), axis=1)
r.loc[~t, 'responsible'] = r.loc[~t, 'responsible'].apply(lambda x: list(set(x)))
r = r['responsible']
return t, r
else:
return None, None
def _to_dict(self):
res = []
for op in self.operands:
res.append(op.to_dict())
return res
def _from_dict(self, dct):
for op in dct:
if 'AND' in op:
a = AndOperator([])
elif 'OR' in op:
a = OrOperator([])
else:
try:
a = Rule()
except Exception as e:
print(e)
raise TypeError('Not a Rule!')
a.from_dict(op.copy())
self.operands.append(a)
class OrOperator(OperatorBaseClass):
@staticmethod
def operation(data, other):
return data | other
def to_dict(self):
return {'OR': self._to_dict()}
def from_dict(self, dct):
dct = dct['OR'].copy()
self._from_dict(dct)
class AndOperator(OperatorBaseClass):
@staticmethod
def operation(data, other):
return data & other
def to_dict(self):
return {'AND': self._to_dict()}
def from_dict(self, dct):
dct = dct['AND'].copy()
self._from_dict(dct)
from functools import wraps
from time import time
def timing(f):
@wraps(f)
def wrap(*args, **kwargs):
ts = time()
result = f(*args, **kwargs)
te = time()
print(f'func: {f.__name__} took {te-ts} seconds.')
return result
return wrap
from rulemall.validators import Range, Unique, Regex, Length, Category
from rulemall.rules import Rule
import json
import yaml
def rule_builder(configs):
if isinstance(configs, list):
output = []
for config in configs:
output.append(config_reader(config))
elif isinstance(configs, dict):
return config_reader(configs)
else:
return TypeError
def config_reader(configs):
name = configs.get('name')
rules = list(map(define_rule, [config for config in configs['rules']]))
rules = list(filter(lambda rule: rule is not None, rules))
return {'rules': rules, 'name': name}
def define_rule(config):
rule_type = config['rule_type']
if rule_type == 'Range':
validator = Range(**config['validator_parameters'])
elif rule_type == 'Regex':
validator = Regex(**config['validator_parameters'])
elif rule_type == 'Length':
validator = Length(**config['validator_parameters'])
elif rule_type == 'Unique':
validator = Unique(**config['validator_parameters'])
elif rule_type == 'Category':
validator = Category(**config['validator_parameters'])
else:
return None
rule = Rule(validator=validator, **config['rule_attributes'])
return rule
import math
import re
from abc import ABCMeta, abstractmethod
import numpy as np
import pandas as pd
import json
import operator
class ValidatorBaseClass(metaclass=ABCMeta):
@abstractmethod
def validate(self, x):
raise NotImplementedError()
@abstractmethod
def to_dict(self):
return NotImplementedError()
class Range(ValidatorBaseClass):
def __init__(self, minimum=None, maximum=None, left_inclusive=True, right_inclusive=False):
self.min = minimum
self.max = maximum
self.li = left_inclusive
self.ri = right_inclusive
self.dct = {'min': self.min, 'max': self.max, 'li': self.li, 'ri': self.ri}
for val in self.dct.keys():
if isinstance(self.dct[val], str):
if self.dct[val] != '':
self.__setattr__(val, json.loads(self.dct[val]))
else:
self.__setattr__(val, None)
self.min = self.min if pd.notna(self.min) else -math.inf
self.max = self.max if pd.notna(self.max) else math.inf
def validate(self, x):
y = pd.to_numeric(x.copy())
if self.li and not self.ri:
return (self.min <= y) & (y < self.max)
elif self.li and self.ri:
return (self.min <= y) & (y <= self.max)
elif not self.li and self.ri:
return (self.min < y) & (y <= self.max)
else:
return (self.min < y) & (y < self.max)
def to_dict(self):
return {'rule_type': 'Range', 'validator_parameters': {'maximum': self.max, 'minimum': self.min,
'left_inclusive': self.li, 'right_inclusive': self.ri}}
def __repr__(self):
return json.dumps({'name': 'بازه عددی', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Range',
'params': [{'name': 'کمینه', 'code': 'minimum', 'default': '-∞', 'type': 'number', 'tag': 'input'},
{'name': 'بیشینه', 'code': 'maximum', 'default': '∞', 'type': 'number', 'tag': 'input'},
{'name': 'شامل کمینه', 'code': 'left_inclusive', 'default': True, 'type': 'bool',
'tag': 'select'},
{'name': 'شامل بیشینه', 'code': 'right_inclusive', 'default': True, 'type': 'bool',
'tag': 'select'}
]})
class Logic(ValidatorBaseClass):
def __init__(self, op_type='eq', value=None):
self.op_type = op_type
self.value = value
def validate(self, x):
if self.op_type == 'lt':
self.value = int(self.value)
return operator.lt(x, self.value)
elif self.op_type == 'le':
self.value = int(self.value)
return operator.le(x, self.value)
elif self.op_type == 'eq':
return operator.eq(x, self.value)
elif self.op_type == 'nq':
return operator.nq(x, self.value)
elif self.op_type == 'ge':
self.value = int(self.value)
return operator.ge(x, self.value)
elif self.op_type == 'gt':
self.value = int(self.value)
return operator.gt(x, self.value)
def to_dict(self):
return {'rule_type': 'Logic', 'validator_parameters': {'op_type': self.op_type,
'value': self.value}}
def __repr__(self):
return json.dumps({'name': 'عملگرهای منطقی', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Logic',
'params': [{'name': 'نوع عملگر', 'code': 'op_type', 'default': '', 'type': 'text', 'tag': 'select'},
{'name': 'عبارت مقایسه کننده', 'code': 'comapare_item', 'default': '', 'type': 'text',
'tag': 'textarea'}]})
class Length(ValidatorBaseClass):
def __init__(self, min_len=None, max_len=None, left_inclusive=True, right_inclusive=False):
self.min_len = min_len
self.max_len = max_len
self.li = left_inclusive
self.ri = right_inclusive
self.dct = {'max_len': self.max_len, 'min_len': self.min_len, 'li': self.li, 'ri': self.ri}
for val in self.dct.keys():
if isinstance(self.dct[val], str):
if self.dct[val] != '':
self.__setattr__(val, json.loads(self.dct[val]))
else:
self.__setattr__(val, None)
self.min_len = self.min_len if pd.notna(self.min_len) else 0
self.max_len = self.max_len if pd.notna(self.max_len) else math.inf
def validate(self, x):
if self.li and not self.ri:
return (self.min_len <= x.str.len()) & (x.str.len() < self.max_len)
elif self.li and self.ri:
return (self.min_len <= x.str.len()) & (x.str.len() <= self.max_len)
elif not self.li and self.ri:
return (self.min_len < x.str.len()) & (x.str.len() <= self.max_len)
else:
return (self.min_len < x.str.len()) & (x.str.len() < self.max_len)
def to_dict(self):
return {'rule_type': 'Length', 'validator_parameters': {'max_len': self.max_len, 'min_len': self.min_len,
'left_inclusive': self.li, 'right_inclusive': self.ri}}
def __repr__(self):
return json.dumps({'name': 'طول بازه', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Length',
'params': [{'name': 'کمینه', 'code': 'min_len', 'default': 0, 'type': 'number', 'tag': 'input'},
{'name': 'بیشینه', 'code': 'max_len', 'default': '∞', 'type': 'number', 'tag': 'input'},
{'name': 'شامل کمینه', 'code': 'left_inclusive', 'default': True, 'type': 'bool', 'tag': 'select'},
{'name': 'شامل بیشینه', 'code': 'right_inclusive', 'default': True, 'type': 'bool', 'tag': 'select'}
]})
class Category(ValidatorBaseClass):
def __init__(self, categories=None):
self.categories = categories or []
def validate(self, x):
y = x.copy().astype('str')
return y.isin(self.categories)
def to_dict(self):
return {'rule_type': 'Category', 'validator_parameters': {'categories': self.categories}}
def __repr__(self):
return json.dumps({'name': 'گروه‌بندی', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Category',
'params': [{'name': 'مقادیر', 'code': 'categories', 'default': [], 'type': 'list', 'tag': 'tagbox'}]})
class Regex(ValidatorBaseClass):
def __init__(self, regex=''):
self.regex = re.compile(regex)
def validate(self, x):
return x.apply(lambda record: bool(re.search(self.regex, record)))
def to_dict(self):
return {'rule_type': 'Regex', 'validator_parameters': {'regex': self.regex}}
def __repr__(self):
return json.dumps({'name': 'Regex', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Regex',
'params': [{'name': 'متن', 'code': 'regex', 'default': '', 'type': 'text', 'tag': 'textarea'}]})
class Missing(ValidatorBaseClass):
def __init__(self, missing=np.nan):
self.missing = missing
def validate(self, x):
# TODO this wouldn't work for np.nan because np.nan!=np.nan
return x == self.missing
def to_dict(self):
return {'rule_type': 'Missing', 'validator_parameters': {'missing': self.missing}}
def __repr__(self):
return json.dumps({'name': 'گمشده', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Missing',
'params': [{'name': 'مقدار گمشده', 'code': 'missing', 'default': np.nan, 'type': 'text', 'tag': 'input'}]})
class Unique:
def __init__(self, keep=False):
self.keep = keep
def validate(self, s):
return s.duplicated(keep=self.keep)
# return lambda x: x.duplicated(keep=self.keep)
def to_dict(self):
return {'rule_type': 'Unique', 'validator_parameters': {'keep': self.keep}}
def __repr__(self):
return json.dumps({'name': 'یکتایی', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Unique',
'params': []})
class Empty(ValidatorBaseClass):
def validate(self, x):
return pd.isna(x)
def to_dict(self):
return {'rule_type': 'Empty'}
def __repr__(self):
return json.dumps({'name': 'خالی بودن', 'desc': 'Lorem Ipsom', 'example': 'Lorem Ipson', 'code': 'Empty',
'params': []})
class Compare(ValidatorBaseClass):
def __init__(self, operation='='):
self.operation = operation
def validate(self, x):
if self.operation == '=':
return x[x.columns[0]] == x[x.columns[1]]
elif self.operation == '>':
return x[x.columns[0]] > x[x.columns[1]]
elif self.operation == '<':
return x[x.columns[0]] < x[x.columns[1]]
raise NotImplementedError
def to_dict(self):
return {'rule_type': 'Compare', 'validator_parameters': {'operation': self.operation}}
# class Bool:
# def __init__(self, operator=None):
# self.operator = operator or 'or'
#
# def validate(self, x, y):
# if self.operator == 'and':
# return x & y
# elif self.operator == 'or':
# return x | y
# class Math:
# def __init__(self, operator=None):
# self.operator = operator
#
# def validate(self, x, y):
# if self.operator == 'sum':
# return x + y
# elif self.operator == 'subtract':
# return x - y
# elif self.operator == 'multiply':
# return x * y
# elif self.operator == 'divide':
# return x / y
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment