# utils.py
#
# Copyright 2020 Anthony "antcer1213" Cervantes <anthony.cervantes@cerver.info>
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
#
from bson import json_util, SON
from traceback import format_exc as _traceback
import datetime
import string
import re
import os
import io
import uuid
from enum import Enum
from typing import List, Optional, Sequence, Type, TypeVar, Union
from bson.objectid import ObjectId
from .vars import TYPES, SCHEMA_TYPES
import inspect
import yaml
import mimetypes
import urllib
import logging
logger = logging.getLogger("cervmongo")
PUNCTUATION_TRANSLATOR = str.maketrans('', '', string.punctuation)
GENERIC_MIMETYPE = "application/octet-stream"
[docs]def detect_mimetype(filename) -> str:
mimetype = 'application/octet-stream'
try:
import magic
mime = magic.Magic(magic_file="bin/magic", mime=True)
if isinstance(filename, str):
mimetype = mime.from_file(filename)
else:
mimetype = mime.id_buffer(filename)
filename.seek(0)
except:
mimetype = mimetypes.guess_type(filename)[0]
return mimetype
[docs]def flatten_dict(dictionary: dict) -> dict:
new_dict = {}
for key, value in dictionary.items():
if isinstance(value, dict):
for subkey, subvalue in value.items():
new_dict["{}.{}".format(key, subkey)] = subvalue
if isinstance(subvalue, dict):
subvalue = flatten_dict(subvalue)
for subsubkey, subsubvalue in subvalue.items():
new_dict["{}.{}.{}".format(key, subkey, subsubkey)] = subvalue
# ~ elif isinstance(value, (tuple, list)): # NOTE: preserve lists
# ~ for subkey, subvalue in enumerate(value):
# ~ new_dict["{}.{}".format(key, subkey)] = subvalue
else:
new_dict[key] = value
return new_dict
[docs]def file_and_fileobj(fileobj):
if isinstance(fileobj, str):
if os.path.exists(fileobj):
return open(fileobj, 'rb')
else:
return fileobj
elif isinstance(fileobj, (io.TextIOBase, io.BufferedIOBase, io.RawIOBase, io.IOBase)):
fileobj.seek(0)
return fileobj
else:
raise TypeError("fileobj is not a valid str or file-like obj; received '{}'".format(type(fileobj)))
[docs]def dict_to_query(dictionary:dict) -> str:
return urllib.parse.urlencode(dictionary)
[docs]def sort_list(item, field:str):
try:
fields = field.split(".")
total = len(fields) - 1
for index, field in enumerate(fields):
if index != total:
if field.isdigit():
item = item[int(field)]
elif field in item:
item = item[field]
else:
return None
else:
if field.isdigit():
return item[int(field)]
elif field in item:
return item[field]
else:
return None
except:
return None
[docs]def getenv_boolean(var_name, default_value=False):
result = default_value
env_value = os.getenv(var_name)
if env_value is not None:
result = env_value.upper() in ("TRUE", "1")
return result
[docs]def current_datetime(alt:str=False) -> datetime.datetime:
"""Returns current datetime object by default. Accepts alternate format for string format result."""
if alt:
return datetime.datetime.now().strftime(alt)
return datetime.datetime.now()
[docs]def current_date(alt:str=False) -> datetime.date:
"""Returns current date object by default. Accepts alternate format for string format result."""
if alt:
return datetime.date.today().strftime(alt)
return datetime.date.today()
[docs]def clean_kwargs(*, ONLY:list=[], kwargs:dict={}) -> dict:
"""Allows for sanitization of keyword args before passing to another function"""
if ONLY:
return {only_key: kwargs.get(only_key, None) for only_key in ONLY if only_key in kwargs}
else:
return kwargs
[docs]def return_value_from_dict(dictionary:dict, key:str, if_not:str=" "):
value = dictionary.get(key if key != "__id" else "_id", if_not)
if isinstance(value, datetime.date):
value = value.strftime('%Y/%m/%d')
elif isinstance(value, ObjectId):
pass
elif isinstance(value, (set, tuple, list, iter)):
try:
value = ", ".join(value)
except:
# INFO: silent fail, intentional
value = value
else:
try:
value = value
except:
# INFO: silent fail, intentional
pass
if instance(value, str):
return value.replace('"', '\\"')
else:
return value
[docs]def snake2camel(snake:str, start_lower:bool=False) -> str:
"""
Converts a snake_case string to camelCase.
The `start_lower` argument determines whether the first letter in the generated camelcase should
be lowercase (if `start_lower` is True), or capitalized (if `start_lower` is False).
"""
camel = snake.title()
camel = re.sub("([0-9A-Za-z])_(?=[0-9A-Z])", lambda m: m.group(1), camel)
if start_lower:
camel = re.sub("(^_*[A-Z])", lambda m: m.group(1).lower(), camel)
return camel
[docs]def camel2snake(camel:str) -> str:
"""
Converts a camelCase string to snake_case.
"""
snake = re.sub(r"([a-zA-Z])([0-9])", lambda m: f"{m.group(1)}_{m.group(2)}", camel)
snake = re.sub(r"([a-z0-9])([A-Z])", lambda m: f"{m.group(1)}_{m.group(2)}", snake)
return snake.lower()
[docs]def objectid_representer(dumper, data):
return dumper.represent_scalar("!_id", str(data))
[docs]def objectid_constructor(loader, data):
return ObjectId(loader.construct_scalar(data))
yaml.SafeDumper.add_representer(ObjectId, objectid_representer)
yaml.add_constructor('!_id', objectid_constructor)
def _get_class_that_defined_method(meth):
if inspect.ismethod(meth):
for cls in inspect.getmro(meth.__self__.__class__):
if cls.__dict__.get(meth.__name__) is meth:
return cls
meth = meth.__func__ # fallback to __qualname__ parsing
if inspect.isfunction(meth):
cls = getattr(inspect.getmodule(meth),
meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
if isinstance(cls, type):
return cls
return None
[docs]def generate_new_id() -> str:
return str(uuid.uuid4())
[docs]def ensure_enums_to_strs(items: Union[Sequence[Union[Enum, str]], Type[Enum]]):
str_items = []
for item in items:
if isinstance(item, Enum):
str_items.append(str(item.value))
else:
str_items.append(str(item))
return str
[docs]def yaml_dump(data:dict) -> str:
return yaml.safe_dump(data, default_flow_style=False)
[docs]def yaml_load(data, _file:bool=False) -> dict:
if _file:
return yaml.load(open(data, 'r'))
else:
return yaml.safe_load(data)
[docs]def json_dump(data:dict, pretty:bool=False) -> str:
if pretty:
return json_util.dumps(data, indent=4, sort_keys=True)
else:
return json_util.dumps(data)
[docs]def json_load(data:str) -> dict:
return json_util.loads(data)
[docs]def clean_traceback() -> str:
traceback = _traceback
# TODO: cleaning logic, to dict, maybe make class?
return traceback
[docs]def silent_drop_kwarg(kwargs:dict, key:str, reason:str=""):
if reason:
logger.debug(f"dropping key {key} for reason {reason}")
else:
logger.debug(f"dropping key {key}")
return kwargs.pop(key)
# TODO: custom jsonschema validator
# INFO: tools to use with JSON samples
[docs]def type_from_schema(schema_type:str):
"""retrieves type function based on JSON sample inferred data schema type"""
schema_type = schema_type.lower()
return TYPES[schema_type]
[docs]def schema_from_dict(dictionary:dict, additional:dict={}):
"""creates a simple JSON schema from JSON sample document"""
schema = {"type": "object", "required": [], "properties" : {}}
for key, value in dictionary.items():
if ":" in key:
key, _type = key.strip().split(":")
_type = SCHEMA_TYPES[_type]
else:
if isinstance(value, str):
_type = "str"
elif isinstance(value, float):
_type = "float"
elif isinstance(value, int):
_type = "int"
elif isinstance(value, dict):
_type = "dict"
elif isinstance(value, ObjectId):
_type = "oid"
elif isinstance(value, datetime.datetime):
_type = "datetime"
elif isinstance(value, datetime.date):
_type = "date"
elif isinstance(value, bool):
_type = "bool"
else:
raise TypeError("unrecognized type '{}' for value '{}'".format(type(value), value))
_type = SCHEMA_TYPES[_type]
required = False
if key.endswith("*"):
key = key.strip("*")
required = True
schema["properties"][key] = {
"type": _type
}
if required:
if not key in schema["required"]:
schema["required"].append(key)
return schema