diff --git a/msticnb/__init__.py b/msticnb/__init__.py index edcceee..5486f8e 100644 --- a/msticnb/__init__.py +++ b/msticnb/__init__.py @@ -38,13 +38,14 @@ import sys from typing import Any, Dict, List, Optional -from .data_providers import DataProviders, init as dp_init # noqa:F401 -from .read_modules import discover_modules, nblts, nb_index, find # noqa:F401 -from .options import get_opt, set_opt # noqa:F401 +from ._version import VERSION +from .data_providers import DataProviders # noqa:F401 +from .data_providers import init as dp_init # noqa:F401 from .nb_browser import NBBrowser # noqa:F401 from .nb_pivot import add_pivot_funcs # noqa:F401 - -from ._version import VERSION +from .notebooklet_func import NBFunc +from .options import get_opt, set_opt # noqa:F401 +from .read_modules import discover_modules, find, nb_index, nblts # noqa:F401 __version__ = VERSION @@ -53,6 +54,9 @@ discover_modules() print(f"Notebooklets: {len(list(nblts.iter_classes()))} notebooklets loaded.") +# Notebooklet functions registry +funcs: Dict[str, NBFunc] = {} + def init( query_provider: str, diff --git a/msticnb/common.py b/msticnb/common.py index deec361..6e939d9 100644 --- a/msticnb/common.py +++ b/msticnb/common.py @@ -287,5 +287,5 @@ def mp_version(): def check_mp_version(required_version: str) -> bool: - """Returns true if the installed version is >= `required_version`.""" - return mp_version().major >= parse_version(required_version).major + """Return true if the installed version is >= `required_version`.""" + return mp_version().major >= parse_version(required_version).major # type: ignore diff --git a/msticnb/notebooklet.py b/msticnb/notebooklet.py index 0d87b7b..3ffaa6d 100644 --- a/msticnb/notebooklet.py +++ b/msticnb/notebooklet.py @@ -8,19 +8,21 @@ import re import warnings from abc import ABC, abstractmethod +from copy import copy from functools import wraps -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union import pandas as pd from IPython.core.getipython import get_ipython from IPython.display import HTML, display -from tqdm.auto import tqdm from msticpy.common.timespan import TimeSpan +from tqdm.auto import tqdm from ._version import VERSION from .common import MsticnbDataProviderError, MsticnbError from .data_providers import DataProviders from .nb_metadata import NBMetadata, read_mod_metadata +from .notebooklet_func import NBFunc from .notebooklet_result import NotebookletResult from .options import get_opt, set_opt @@ -36,10 +38,11 @@ class Notebooklet(ABC): name="Notebooklet", description="Base class", default_options=[] ) module_path = "" + nb_functions: List[NBFunc] = [] def __init__(self, data_providers: Optional[DataProviders] = None, **kwargs): """ - Intialize a new instance of the notebooklet class. + Initialize a new instance of the notebooklet class. Parameters ---------- @@ -60,6 +63,7 @@ def __init__(self, data_providers: Optional[DataProviders] = None, **kwargs): self._set_tqdm_notebook(get_opt("verbose")) self._last_result: Any = None self.timespan = TimeSpan(period="1d") + self.last_run_args: Dict[str, Any] = {} self._inst_default_silent: Optional[bool] = kwargs.get("silent") self._current_run_silent: Optional[bool] = None set_opt("temp_silent", self.silent) @@ -198,6 +202,15 @@ def run( self.timespan = TimeSpan(timespan=timespan) elif "start" in kwargs and "end" in kwargs: self.timespan = TimeSpan(start=kwargs.get("start"), end=kwargs.get("end")) + self.last_run_args = { + "value": value, + "data": data, + "timespan": timespan, + "options": options, + "silent": self.silent, + "qry_prov": self.query_provider, + **kwargs, + } return NotebookletResult(notebooklet=self) def get_pivot_run(self, get_timespan: Callable[[], TimeSpan]): @@ -435,7 +448,7 @@ def match_terms(cls, search_terms: str) -> Tuple[bool, int]: 1 for term in terms if re.search(term, search_text, re.IGNORECASE) ) - return (bool(match_count == len(terms)), match_count) + return match_count == len(terms), match_count @staticmethod def _set_tqdm_notebook(verbose=False): @@ -571,3 +584,123 @@ def list_methods(self) -> List[str]: desc = f_doc.split("\n", maxsplit=1)[0] if f_doc else "" method_desc.append(f"{name} - '{desc}'") return method_desc + + @classmethod + def add_nb_function(cls, nb_func: Union[str, NBFunc], **kwargs): + """ + Add a notebooklet function to the class. + + Functions added to the class will be run when run_nb_funcs is + called. + + Parameters + ---------- + nb_func : Union[str, NBFunc] + The fully-qualified name of the NB function or + an instance of the NBFunc class. + + Optional Parameters + ------------------- + options : Optional[Union[str, List[str]]], optional + Override the options controlling the running of the function. + header : Optional[str], optional + Override the header to display when running the function, by default None + text : Optional[str], optional + Override the text to display when running the function, by default None + result_attrib : Optional[str], optional + Override the name of the notebooklet result attribute to assign the + function return value to, by default None + + """ + nb_func_instance = cls._lookup_nb_func(nb_func) + nb_func_instance = cls._customize_nb_func(nb_func_instance, kwargs) + cls.nb_functions.append(nb_func_instance) + + def run_nb_func(self, nb_func: Union[str, NBFunc], **kwargs): + """ + Run the notebooklet function and return the results. + + By default, the function will be passed the set of kwargs used + in the last call to notebooklet.run(). + You can override some of the nb_func attributes as well as the + default runtime parameters by supplying the keyword + arguments with the same names. + + Parameters + ---------- + nb_func : Union[str, NBFunc] + The registered name of the function or an instance of + the NBFunc class + + Optional Parameters + ------------------- + options : Optional[Union[str, List[str]]], optional + Override the options controlling the running of the function. + header : Optional[str], optional + Override the header to display when running the function, by default None + text : Optional[str], optional + Override the text to display when running the function, by default None + result_attrib : Optional[str], optional + Override the name of the notebooklet result attribute to assign the + function return value to, by default None + kwargs : Any + Any other keyword parameters to supply to the function. + + Returns + ------- + Any : + The returned results from the function. + + Notes + ----- + If the notebooklet function has the `result_attrib` attribute set, the + results will also be saved to the notebooklet result with the + attribute of this name. + + See Also + -------- + NBFunc - the Notebooklet function class. + run_nb_funcs - Run all functions. + + """ + nb_func_instance = self._lookup_nb_func(nb_func) + + nb_func_instance = self._customize_nb_func(nb_func=nb_func_instance, **kwargs) + # use the notebooklet default kwargs + func_def_kwargs = {**self.last_run_args} + # add any custom kwargs from invocation. + func_def_kwargs.update(kwargs) + return nb_func_instance.run(**func_def_kwargs) + + @staticmethod + def _lookup_nb_func(nb_func: Union[str, NBFunc]) -> NBFunc: + """Return the NBFunc or find it in the funcs registry.""" + if isinstance(nb_func, NBFunc): + return nb_func + if isinstance(nb_func, str): + # pylint: disable=import-outside-toplevel + import msticnb + + nb_reg_func = msticnb.funcs.get(nb_func) + if not nb_reg_func: + raise LookupError(f"Function {nb_func} not found") + return nb_reg_func + raise TypeError( + f"Invalid type for `nb_func` parameter {nb_func} ({type(nb_func)})." + ) + + def run_nb_funcs(self): + """Run all notebooklet functions defined for the notebooklet.""" + for nb_func in self.__class__.nb_functions: + nb_func.run(**(self.last_run_args)) + + @classmethod + def _customize_nb_func(cls, nb_func: NBFunc, kwargs) -> NBFunc: + """If any kwargs""" + valid_update_opts = {"options", "header", "text", "result_attrib"} + if kwargs and valid_update_opts & kwargs.keys(): + nb_func = copy(nb_func) + for attrib, value in kwargs.items(): + if attrib in valid_update_opts: + setattr(nb_func, attrib, value) + return nb_func diff --git a/msticnb/notebooklet_func.py b/msticnb/notebooklet_func.py new file mode 100644 index 0000000..8e33934 --- /dev/null +++ b/msticnb/notebooklet_func.py @@ -0,0 +1,431 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +""" +Notebooklet function class and decorator. + +This allows custom functions to be added to a notebooklet + +Examples +-------- + +Function defintions would look like the following examples. + +A standard function that returns some data + +.. code:: python + + from msticnb.nb.azsent.host.host_summary import HostSummary + @nb_func( + notebooklets=[HostSummary], # Notebooklets to add func to + name="get_logon_data", # Name of the function + func_type="run", # The type of function + options="logons", # The options controlling if the func is run + header="Header displayed when func is run", + text="Descriptive text displayed.", + result_attrib="host_logons" # The attribute name to add to the result. + ) + def fetch_host_logons(timerange, **kwargs): + ... + return xyz.get_stuff(timerange) + + +Most of the decorator parameters are optional. This defintion does not +attach the function to a notebooklet and uses the function name. +No options are specified so it will always run. No display text is +used. + +.. code:: python + + from msticnb.nb.azsent.host.host_summary import HostSummary + @nb_func( + result_attrib="host_logons" # The attribute name to add to the result. + ) + def fetch_host_logons(timerange, **kwargs): + ... + return xyz.get_stuff(timerange) + +Queries - built-in queries +~~~~~~~~~~~~~~~~~~~~~~~~~~ +A query function using a built-in query. Note this function +should return either a query name or a tuple of the query name plus a dictionary to map +generic parameters to params expected by the query. + +.. code:: python + + @nb_func( + notebooklets=[HostSummary], + name="get_logon_data", + func_type="query", + options="logons", + header="Header displayed when func is run", + text="Descriptive text displayed.", + result_attrib="host_logons + ) + def query_host_logons(qry_prov, timerange, value, **kwargs): + return ( + "WindowsSecurity.get_host_logons", + {"value": "host_name"} + ) + +Queries - query strings +~~~~~~~~~~~~~~~~~~~~~~~ + +A query function using a built-in query. Note this function +should return a tuple of the query text plus a dictionary to map +generic parameters to params expected by the query. +The latter is optional if no mapping is needed. + +Using raw query text +.. code:: python + + @nb_func( + func_type="query", + options="logons", + result_attrib="host_logons + ) + def query_host_logons(qry_prov, timerange, value, **kwargs): + return ( + "SecurityEvent | where start = datetime({start}) " + "| where Computer == '{value}'", + ) + +.. code:: python + + @nb_func( + func_type="query", + options="logons", + result_attrib="host_logons + ) + def query_host_logons(qry_prov, timerange, value, **kwargs): + return ( + "WindowsSecurity.list_host_logons", + {"value": "host_name"}, + ) + +Each function will be invoked with a standard set of kwargs: + +- qry_prov - the query provider +- timerange - timerange for the notebooklet operations. Note: this is + passed to queries as 'start' and 'end'. +- options - the Notebooklet options arg +- silent - whether to display (this is handled by the NBFunc class) +- value - the `value` arg to the Notebooklet run +- data - the `data` arg to the Notebooklet run + +""" + +import contextlib +import warnings +from pathlib import Path +from typing import Any, Callable, Dict, List, Literal, Optional, Set, Union, get_args + +from ._version import VERSION +from .common import set_text + +__version__ = VERSION +__author__ = "Ian Hellen" + +NBFuncType = Literal["run", "display", "query"] + +_VALID_FUNC_TYPES = set(get_args(NBFuncType)) + + +class NBFunc: + """Notebooklet custom function.""" + + # pylint: disable=too-many-arguments + def __init__( + self, + func: Callable[..., Any], + name: Optional[str] = None, + func_type: NBFuncType = "run", + options: Optional[Union[str, List[str]]] = None, + header: Optional[str] = None, + text: Optional[str] = None, + result_attrib: Optional[str] = None, + **kwargs, + ): + """ + Create an instance of a notebooklet function. + + Parameters + ---------- + func : Callable[..., Any] + The function to run + name : Optional[str], optional + Optional name, by default this is the name of the + original function + func_type : NBFuncType, optional + Either "run" - simple execution and return results or + "query" - treat the contents of the function as a query + definition to execute and return results from, by default "run" + options : Optional[Union[str, List[str]]], optional + If the function should be run only when a given notebooklet + option is specified add the option as a string or list of + string options, by default None + header : Optional[str], optional + The header to display when running the function, by default None + text : Optional[str], optional + The descriptive text to display when running the function, by default None + result_attrib : Optional[str], optional + The name of the notebooklet result attribute to assign the + function return value to, by default None + + Raises + ------ + ValueError + If an invalid function type is specified. + + """ + self.name = name or func.__name__ + self.options = ( + options if isinstance(options, list) else [options] if options else [] + ) + self.header = header + self.text = text + self.result_attrib = result_attrib + self.name_map = kwargs.pop("qry_param_map", {}) + self.doc = func.__doc__ + + if func_type not in _VALID_FUNC_TYPES: + raise ValueError( + "Function type must be one of the following", + ", ".join(f"'{func_type}'" for func_type in _VALID_FUNC_TYPES), + ) + self.func_type = func_type + self._run_func: Optional[Callable[..., Any]] = func + + def run(self, **kwargs): + """Run method.""" + if not self._check_options(**kwargs): + return None + # Create a set_text wrapper to optionally display title/text + text_wrapper = set_text(title=self.header, text=self.text) + if self.func_type == "run" and callable(self._run_func): + # wrap the function in the text display wrapper and run + wrapped_func = text_wrapper(self._run_func) + return self._set_result_attrib(wrapped_func(**kwargs)) + if self.func_type == "query" and isinstance(self._run_func, str): + # wrap the function in the text display wrapper and run + # Since this a query we handle a little differently + wrapped_func = text_wrapper(self.run_query) + return self._set_result_attrib(wrapped_func(self._run_func, **kwargs)) + warnings.warn( + f"Notebooklet custom function {self.name} has an" + f" invalid 'func_type': {self.func_type}" + ) + return None + + def run_query(self, query, **kwargs): + """Run a query.""" + qry_prov = kwargs.get("qry_prov") + if not qry_prov: + raise ValueError( + f"No 'qry_prov' in parameters to query function {self.name}" + ) + + qry_kwargs = self._remap_names(self.name_map, kwargs) + timespan = kwargs.get("timespan") + if timespan: + qry_kwargs["start"] = timespan.start + qry_kwargs["end"] = timespan.end + # the query could be a built-in query + qry_suffix = f".{query}" + builtin_query = next( + iter( + query for query in qry_prov.list_queries() if query.endswith(qry_suffix) + ) + ) + + if builtin_query: + # The query could be fully-qualified or just a stem + # if it's a dotted name, get the stem + if "." in query: + query = query.rsplit(".", maxsplit=1)[1] + # and look for it in the all_queries container. + query_func = getattr(qry_prov.all_queries, query) + return self._set_result_attrib(query_func(**qry_kwargs)) + # If not, assume the query is a string. + with contextlib.suppress(IndexError, KeyError): + # and try to str.format it with kwargs - but ignore if this fails. + query = query.format(**qry_kwargs) + return self._set_result_attrib(qry_prov.exec_query(query)) + + def _check_options(self, **kwargs) -> bool: + """Return True if func options are in current options.""" + nb_options: Set[str] = set(kwargs.pop("options", [])) + if not self.options: + return True + return any(opt for opt in self.options if opt in nb_options) + + def _set_result_attrib(self, func_result, **kwargs): + if not self.result_attrib: + return func_result + nb_result = kwargs.pop("result", None) + setattr(nb_result, self.result_attrib, func_result) + return func_result + + @staticmethod + def _remap_names( + name_map: Dict[str, str], kwargs: Dict[str, Any] + ) -> Dict[str, Any]: + """Return dictionary with mapped names.""" + return {name_map.get(name, name): val for name, val in kwargs.items()} + + +# pylint: disable=no-member +def _register_nb_function(nb_func_cls, reg_path): + """Add the function to msticnb funcs dictionary.""" + msticnb = _get_msticnb() + if not hasattr(msticnb, "funcs"): + setattr(msticnb, "funcs", {}) + msticnb.funcs[reg_path] = nb_func_cls + + +# pylint: disable=no-member +def list_functions(): + """Return list of notebooklet functions.""" + msticnb = _get_msticnb() + return list(msticnb.funcs) if hasattr(msticnb.funcs) else [] + + +# pylint: disable=too-many-arguments +def nb_func( + name: Optional[str] = None, + func_type: NBFuncType = "run", + reg_path: Optional[str] = None, + notebooklets: Optional[Union[str, List[str]]] = None, + options: Optional[str] = None, + header: Optional[str] = None, + text: Optional[str] = None, + result_attrib: Optional[str] = None, +): + """ + Register the function as a notebook function. + + Parameters + ---------- + name : Optional[str], optional + Optional name for the function, by default, the original + function name is used. + func_type : NBFuncType, optional + Either "run" - simple execution and return results or + "query" - treat the contents of the function as a query + definition to execute and return results from, by default "run" + options : Optional[str], optional + If the function should be run only when a given notebooklet + option is specified add the option as a string or list of + string options, by default None + header : Optional[str], optional + The header to display when running the function, by default None + text : Optional[str], optional + The descriptive text to display when running the function, by default None + result_attrib : Optional[str], optional + The name of the notebooklet result attribute to assign the + function return value to, by default None. If not specified, the results + of the function execution will not be saved to the results + object. + reg_path : Optional[str], optional + The dotted path name to register the function, by default this + is derived from the relative path of the module that the function + is defined in (e.g. sent.host.queries.my_host_queries) + notebooklets : Optional[Union[str, List[str]]], optional + A list of notebooklet classes to attach the function to, by default None + + Returns + ------- + Callable + The wrapped function. + + Notes + ----- + The decorator does not alter the behavior of the function, it creates + an NBFunc class that wraps the function and registers this function + with the global notebooklets functions. + + Examples + -------- + Function registration with several options specified. + + .. code:: python + + from msticnb.nb.azsent.host.host_summary import HostSummary + @nb_func( + notebooklets=[HostSummary], # Notebooklets to add func to + name="get_logon_data", # Name of the function + func_type="run", # The type of function + options="logons", # The options controlling if the func is run + header="Header displayed when func is run", + text="Descriptive text displayed.", + result_attrib="host_logons" # The attribute name to add to the result. + ) + def fetch_host_logons(timerange, **kwargs): + ... + return xyz.get_stuff(timerange) + + + Same function with minimal options. + + .. code:: python + + from msticnb.nb.azsent.host.host_summary import HostSummary + @nb_func( + result_attrib="host_logons" # The attribute name to add to the result. + ) + def fetch_host_logons(timerange, **kwargs): + ... + return xyz.get_stuff(timerange) + + """ + + def nb_func_wrapper(func): + """Add the function to the set of classes.""" + qry_param_map = None + if func_type == "query": + # run the function to get the values specified in the body + query_settings = func() + if isinstance(query_settings, tuple): + func, qry_param_map = query_settings + else: + func = query_settings, {} + # create the NBFunc wrapper class. + custom_func = NBFunc( + func=func, + name=name, + func_type=func_type, + options=options, + header=header, + text=text, + result_attrib=result_attrib, + qry_param_map=qry_param_map, + ) + # Add it to any notebooklets defined. + if notebooklets: + for notebooklet in notebooklets: + notebooklet.add_custom_func(custom_func) + + # register the NBFunc + _register_nb_function(custom_func, reg_path or _get_default_func_path(func)) + return func + + return nb_func_wrapper + + +def _get_default_func_path(func) -> str: + """Create function path from module path.""" + root_path = Path(_get_msticnb().__file__.replace("__init__.py", "")) + func_path = Path(func.__module__.__file__) + return ".".join( + [*(func_path.resolve().relative_to(root_path).parts[:-1]), func_path.stem] + ) + + +def _get_msticnb(): + """Return root module, avoiding circular imports.""" + # pylint: disable=import-outside-toplevel, cyclic-import + import msticnb + + return msticnb