Skip to content

CosmoTech_Acceleration_Library.Modelops.core.io.model_writer

ModelWriter

Bases: VersionedGraphHandler

Model Writer for cached data

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_writer.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ModelWriter(VersionedGraphHandler):
    """
    Model Writer for cached data
    """

    @update_last_modified_date
    def create_twin(self, twin_type: str, properties: dict):
        """
        Create a twin
        :param twin_type: the twin type
        :param properties: the twin properties
        """
        create_query = ModelUtil.create_twin_query(twin_type, properties)
        logger.debug(f"Query: {create_query}")
        self.graph.query(create_query)

    @update_last_modified_date
    def create_relationship(self, relationship_type: str, properties: dict):
        """
        Create a relationship
        :param relationship_type: the relationship type
        :param properties: the relationship properties
        """
        create_rel = ModelUtil.create_relationship_query(relationship_type, properties)
        logger.debug(f"Query: {create_rel}")
        self.graph.query(create_rel)

create_relationship(relationship_type, properties)

Create a relationship :param relationship_type: the relationship type :param properties: the relationship properties

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_writer.py
28
29
30
31
32
33
34
35
36
37
@update_last_modified_date
def create_relationship(self, relationship_type: str, properties: dict):
    """
    Create a relationship
    :param relationship_type: the relationship type
    :param properties: the relationship properties
    """
    create_rel = ModelUtil.create_relationship_query(relationship_type, properties)
    logger.debug(f"Query: {create_rel}")
    self.graph.query(create_rel)

create_twin(twin_type, properties)

Create a twin :param twin_type: the twin type :param properties: the twin properties

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_writer.py
17
18
19
20
21
22
23
24
25
26
@update_last_modified_date
def create_twin(self, twin_type: str, properties: dict):
    """
    Create a twin
    :param twin_type: the twin type
    :param properties: the twin properties
    """
    create_query = ModelUtil.create_twin_query(twin_type, properties)
    logger.debug(f"Query: {create_query}")
    self.graph.query(create_query)