Skip to content

Machina Periodic Worker API#

Analysis worker base class. Workers inheriting from this class analzye based on a schedule

Source code in machina/core/periodic_worker.py
class PeriodicWorker():
    """Analysis worker base class.  Workers inheriting from this class analzye based on a schedule"""

    def __init__(self):
        self.cls_name = self.__class__.__name__
        self.config = self._load_configs()
        self.app = Rocketry()

        # Logging
        level = logging.getLevelName(self.config['worker']['log_level'])
        logging.basicConfig(level=level, format='[*] %(message)s')
        self.logger = logging.getLogger(__name__)

        # reduce Rocketry logging level
        logging.getLogger('rocketry.scheduler').setLevel(logging.ERROR)

        # neo4j set connection
        _cfg = self.config['neo4j']
        config.DATABASE_URL = f"bolt://{_cfg['user']}:{_cfg['pass']}@{_cfg['host']}:{_cfg['port']}/{_cfg['db_name']}"

    #############################################################
    # Privates
    def _load_configs(self) -> dict:
        """load configuration files from expected path, return as dictionary

        :return: the configuration dictionary
        :rtype: dict
        """

        fdir = '/configs'

        paths_cfg_fp = Path(fdir, 'paths.json')
        with open(paths_cfg_fp, 'r') as f:
            paths_cfg = json.load(f)

        neo4j_cfg_fp = Path(fdir, 'neo4j.json')
        with open(neo4j_cfg_fp, 'r') as f:
            neo4j_cfg = json.load(f)

        types_fp = Path(fdir, 'types.json')
        with open(types_fp, 'r') as f:
            types_cfg = json.load(f)

        # Base-worker configurations, will be overridden by worker-specifc
        # configurations if there is overlap
        base_worker_cfg_fp = Path(fdir, 'workers', 'PeriodicWorker.json')
        with open(base_worker_cfg_fp, 'r') as f:
            worker_cfg = json.load(f)

        # Worker-specific configuration
        worker_cfg_fp = Path(fdir, 'workers', self.cls_name+'.json')
        with open(worker_cfg_fp, 'r') as f:
            worker_cfg.update(json.load(f))

        return dict(paths=paths_cfg,
                    neo4j=neo4j_cfg,
                    types=types_cfg,
                    worker=worker_cfg)
    #############################################################

    def start(self):
        """start running callback at interval"""
        self.logger.info(f"starting with interval: {self.config['worker']['interval']}")
        self.app.task(self.config['worker']['interval'], func=self.callback)
        self.app.run()

    def callback(self):
        """implement in subclass"""
        raise NotImplemented

    #############################################################
    # Triggers
    def n_nodes_added_since(
        n: int, 
        node_cls: Type[Base], 
        duration: timedelta):
        """return True if 'n' nodes of 'node_cls' type have been added within a duration
        of time, return False if either (or both) condition was not met.

        :param n: the threshold number of nodes to consider before returning True
        :type n: int
        :param node_cls: the neomodel OGM class to use for counting node instances
        :type node_cls: type[Base]
        :param duration: the datetime.timedelta object specifying the threshold duration of time
        :type duration: timedelta
        :return: True if both conditions were met, False of neither (or both) not met
        :rtype: bool
        """

        # get adjusted timestamp for provided 
        # threshold duration
        now = datetime.now(timezone.utc)
        duration_ts = (now - duration)

        # filter nodes within the duration window
        nodes = node_cls.nodes.filter(ts__gte=duration_ts).order_by('ts')
        if len(nodes) >= n:
            return True
        return False

callback() #

implement in subclass

Source code in machina/core/periodic_worker.py
def callback(self):
    """implement in subclass"""
    raise NotImplemented

n_nodes_added_since(n, node_cls, duration) #

return True if 'n' nodes of 'node_cls' type have been added within a duration of time, return False if either (or both) condition was not met.

Parameters:

Name Type Description Default
n int

the threshold number of nodes to consider before returning True

required
node_cls Type[Base]

the neomodel OGM class to use for counting node instances

required
duration timedelta

the datetime.timedelta object specifying the threshold duration of time

required

Returns:

Type Description
bool

True if both conditions were met, False of neither (or both) not met

Source code in machina/core/periodic_worker.py
def n_nodes_added_since(
    n: int, 
    node_cls: Type[Base], 
    duration: timedelta):
    """return True if 'n' nodes of 'node_cls' type have been added within a duration
    of time, return False if either (or both) condition was not met.

    :param n: the threshold number of nodes to consider before returning True
    :type n: int
    :param node_cls: the neomodel OGM class to use for counting node instances
    :type node_cls: type[Base]
    :param duration: the datetime.timedelta object specifying the threshold duration of time
    :type duration: timedelta
    :return: True if both conditions were met, False of neither (or both) not met
    :rtype: bool
    """

    # get adjusted timestamp for provided 
    # threshold duration
    now = datetime.now(timezone.utc)
    duration_ts = (now - duration)

    # filter nodes within the duration window
    nodes = node_cls.nodes.filter(ts__gte=duration_ts).order_by('ts')
    if len(nodes) >= n:
        return True
    return False

start() #

start running callback at interval

Source code in machina/core/periodic_worker.py
def start(self):
    """start running callback at interval"""
    self.logger.info(f"starting with interval: {self.config['worker']['interval']}")
    self.app.task(self.config['worker']['interval'], func=self.callback)
    self.app.run()