Skip to content

CosmoTech_Acceleration_Library.Modelops.core.io.model_exporter

ModelExporter

Bases: GraphHandler

Model Exporter for cached data

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_exporter.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class ModelExporter(GraphHandler):
    """
    Model Exporter for cached data
    """

    def __init__(self, host: str, port: int, name: str, password: str = None, export_dir: str = "/"):
        super().__init__(host=host, port=port, name=name, password=password)
        Path(export_dir).mkdir(parents=True, exist_ok=True)
        self.export_dir = export_dir

        self.mr = ModelReader(host=host, port=port, name=name, password=password)
        self.labels = [label[0] for label in self.graph.labels()]
        self.relationships = [relation[0] for relation in self.graph.relationship_types()]
        self.already_exported_nodes = {}
        self.already_exported_edges = []

    @GraphHandler.do_if_graph_exist
    def export_all_twins(self):
        """
        Export all twins
        :return: Csv files containing all twin instances exported into {export_dir} folder named by twin type
        """
        logger.debug("Start exporting twins...")
        logger.debug("Get twin types...")
        get_types_start = time.time()
        twin_names = self.mr.get_twin_types()
        get_types_end = time.time() - get_types_start
        logger.debug(f"Get twin types took {get_types_end} s")

        for twin_name in twin_names:
            logger.debug(f"Get twin info for type {twin_name} ...")
            get_twin_info_start = time.time()
            twin_results = self.mr.get_twins_by_type(twin_name)
            get_twin_info_end = time.time() - get_twin_info_start
            logger.debug(f"Get twin info for type {twin_name} took {get_twin_info_end} s")

            logger.debug(f"Export twin info for type {twin_name} ...")
            export_twin_info_start = time.time()
            CsvWriter.write_twin_data(self.export_dir, twin_name, twin_results)
            export_twin_info_end = time.time() - export_twin_info_start
            logger.debug(f"Export twin info for type {twin_name} took {export_twin_info_end} s")

            logger.debug(f"Twins exported :{twin_name}")
        logger.debug("... End exporting twins")

    @GraphHandler.do_if_graph_exist
    def export_all_relationships(self):
        """
        Export all relationships
        :return: Csv files containing all relationship instances exported into {export_dir}
        folder named by relationship type
        """
        logger.debug("Start exporting relationships...")
        logger.debug("Get relationship types...")
        get_relationship_types_start = time.time()
        relationship_names = self.mr.get_relationship_types()
        get_relationship_types_end = time.time() - get_relationship_types_start
        logger.debug(f"Get relationship types took {get_relationship_types_end} s")

        for relationship_name in relationship_names:
            logger.debug(f"Get relationship info for type {relationship_name} ...")
            get_relationship_info_start = time.time()
            relationship_result = self.mr.get_relationships_by_type(relationship_name)
            get_relationship_info_end = time.time() - get_relationship_info_start
            logger.debug(f"Get relationship info for type {relationship_name} took {get_relationship_info_end} s")

            logger.debug(f"Export relationship info for type {relationship_name} ...")
            export_relationship_info_start = time.time()
            CsvWriter.write_relationship_data(self.export_dir, relationship_name, relationship_result)
            export_relationship_info_end = time.time() - export_relationship_info_start
            logger.debug(f"Export relationship info for type {relationship_name} took {export_relationship_info_end} s")

            logger.debug(f"Relationships exported :{relationship_name}")
        logger.debug("... End exporting relationships")

    @GraphHandler.do_if_graph_exist
    def export_all_data(self):
        """
        Export all data
        :return: a bunch of csv files corresponding to graph data
        """
        self.export_all_twins()
        self.export_all_relationships()

    @GraphHandler.do_if_graph_exist
    def export_from_queries(self, queries: list):
        """
        Export data from queries
        Queries must be Cypher queries and return nodes and relationships objects to be exported
        Multiple instances of the same node or relationship will not be exported

        :param queries: list of queries to execute (Cypher queries)
        :return: None writes csv files corresponding to the results of the queries in the parameters
        """
        logger.info("Start exporting data from queries...")
        # foreach query, execute it and get nodes and relationships
        for query in queries:
            logger.info(f"Export data from query {query} ...")
            export_data_from_query_start = time.time()
            query_result = self.mr.query(query, read_only=True)

            # foreach query result, get nodes and relationships
            nodes_by_label = {key: [] for key in self.labels}
            edges_by_relation = {key: [] for key in self.relationships}
            for result in query_result.result_set:
                for data in result:
                    if type(data) == redis.commands.graph.node.Node:
                        if data.id not in self.already_exported_nodes:
                            self.already_exported_nodes.update({data.id: data.properties.get('id')})
                            nodes_by_label[data.label].append(data)
                    elif type(data) == redis.commands.graph.edge.Edge:
                        if data.id not in self.already_exported_edges:
                            self.already_exported_edges.append(data.id)
                            edges_by_relation[data.relation].append(data)

            # write node data into csv file
            for label, nodes in nodes_by_label.items():
                if nodes:
                    nodes_rows = [node.properties for node in nodes]
                    CsvWriter.write_data(self.export_dir, label, nodes_rows)

            # write edge data into csv file
            for relation, edges in edges_by_relation.items():
                if edges:
                    # add source and target to edge properties
                    edges_rows = []
                    for edge in edges:
                        logger.debug(f"Get source and target for edge {edge.id} ...")
                        edge.properties['source'] = self.get_node_id_from_sys_id(edge.src_node)
                        edge.properties['target'] = self.get_node_id_from_sys_id(edge.dest_node)
                        edges_rows.append(edge.properties)
                    CsvWriter.write_data(self.export_dir, relation, edges_rows)

            export_data_from_query_end = time.time() - export_data_from_query_start
            logger.debug(f"Export data from query took {export_data_from_query_end} s")

            logger.debug("Data from query exported")
        logger.info("... End exporting data from queries")

    @lru_cache
    def get_node_id_from_sys_id(self, sys_id: int) -> int:
        """
        Get node id from system id (RedisGraph id)
        :param sys_id: system id
        :return: node id
        """
        if sys_id in self.already_exported_nodes:
            return self.already_exported_nodes[sys_id]
        node_query = "MATCH (n) WHERE ID(n) = $id RETURN n.id"
        return self.mr.query(node_query, params={'id': sys_id}).result_set[0][0]

export_all_data()

Export all data :return: a bunch of csv files corresponding to graph data

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_exporter.py
91
92
93
94
95
96
97
98
@GraphHandler.do_if_graph_exist
def export_all_data(self):
    """
    Export all data
    :return: a bunch of csv files corresponding to graph data
    """
    self.export_all_twins()
    self.export_all_relationships()

export_all_relationships()

Export all relationships :return: Csv files containing all relationship instances exported into {export_dir} folder named by relationship type

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_exporter.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@GraphHandler.do_if_graph_exist
def export_all_relationships(self):
    """
    Export all relationships
    :return: Csv files containing all relationship instances exported into {export_dir}
    folder named by relationship type
    """
    logger.debug("Start exporting relationships...")
    logger.debug("Get relationship types...")
    get_relationship_types_start = time.time()
    relationship_names = self.mr.get_relationship_types()
    get_relationship_types_end = time.time() - get_relationship_types_start
    logger.debug(f"Get relationship types took {get_relationship_types_end} s")

    for relationship_name in relationship_names:
        logger.debug(f"Get relationship info for type {relationship_name} ...")
        get_relationship_info_start = time.time()
        relationship_result = self.mr.get_relationships_by_type(relationship_name)
        get_relationship_info_end = time.time() - get_relationship_info_start
        logger.debug(f"Get relationship info for type {relationship_name} took {get_relationship_info_end} s")

        logger.debug(f"Export relationship info for type {relationship_name} ...")
        export_relationship_info_start = time.time()
        CsvWriter.write_relationship_data(self.export_dir, relationship_name, relationship_result)
        export_relationship_info_end = time.time() - export_relationship_info_start
        logger.debug(f"Export relationship info for type {relationship_name} took {export_relationship_info_end} s")

        logger.debug(f"Relationships exported :{relationship_name}")
    logger.debug("... End exporting relationships")

export_all_twins()

Export all twins :return: Csv files containing all twin instances exported into {export_dir} folder named by twin type

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_exporter.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@GraphHandler.do_if_graph_exist
def export_all_twins(self):
    """
    Export all twins
    :return: Csv files containing all twin instances exported into {export_dir} folder named by twin type
    """
    logger.debug("Start exporting twins...")
    logger.debug("Get twin types...")
    get_types_start = time.time()
    twin_names = self.mr.get_twin_types()
    get_types_end = time.time() - get_types_start
    logger.debug(f"Get twin types took {get_types_end} s")

    for twin_name in twin_names:
        logger.debug(f"Get twin info for type {twin_name} ...")
        get_twin_info_start = time.time()
        twin_results = self.mr.get_twins_by_type(twin_name)
        get_twin_info_end = time.time() - get_twin_info_start
        logger.debug(f"Get twin info for type {twin_name} took {get_twin_info_end} s")

        logger.debug(f"Export twin info for type {twin_name} ...")
        export_twin_info_start = time.time()
        CsvWriter.write_twin_data(self.export_dir, twin_name, twin_results)
        export_twin_info_end = time.time() - export_twin_info_start
        logger.debug(f"Export twin info for type {twin_name} took {export_twin_info_end} s")

        logger.debug(f"Twins exported :{twin_name}")
    logger.debug("... End exporting twins")

export_from_queries(queries)

Export data from queries Queries must be Cypher queries and return nodes and relationships objects to be exported Multiple instances of the same node or relationship will not be exported

:param queries: list of queries to execute (Cypher queries) :return: None writes csv files corresponding to the results of the queries in the parameters

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_exporter.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@GraphHandler.do_if_graph_exist
def export_from_queries(self, queries: list):
    """
    Export data from queries
    Queries must be Cypher queries and return nodes and relationships objects to be exported
    Multiple instances of the same node or relationship will not be exported

    :param queries: list of queries to execute (Cypher queries)
    :return: None writes csv files corresponding to the results of the queries in the parameters
    """
    logger.info("Start exporting data from queries...")
    # foreach query, execute it and get nodes and relationships
    for query in queries:
        logger.info(f"Export data from query {query} ...")
        export_data_from_query_start = time.time()
        query_result = self.mr.query(query, read_only=True)

        # foreach query result, get nodes and relationships
        nodes_by_label = {key: [] for key in self.labels}
        edges_by_relation = {key: [] for key in self.relationships}
        for result in query_result.result_set:
            for data in result:
                if type(data) == redis.commands.graph.node.Node:
                    if data.id not in self.already_exported_nodes:
                        self.already_exported_nodes.update({data.id: data.properties.get('id')})
                        nodes_by_label[data.label].append(data)
                elif type(data) == redis.commands.graph.edge.Edge:
                    if data.id not in self.already_exported_edges:
                        self.already_exported_edges.append(data.id)
                        edges_by_relation[data.relation].append(data)

        # write node data into csv file
        for label, nodes in nodes_by_label.items():
            if nodes:
                nodes_rows = [node.properties for node in nodes]
                CsvWriter.write_data(self.export_dir, label, nodes_rows)

        # write edge data into csv file
        for relation, edges in edges_by_relation.items():
            if edges:
                # add source and target to edge properties
                edges_rows = []
                for edge in edges:
                    logger.debug(f"Get source and target for edge {edge.id} ...")
                    edge.properties['source'] = self.get_node_id_from_sys_id(edge.src_node)
                    edge.properties['target'] = self.get_node_id_from_sys_id(edge.dest_node)
                    edges_rows.append(edge.properties)
                CsvWriter.write_data(self.export_dir, relation, edges_rows)

        export_data_from_query_end = time.time() - export_data_from_query_start
        logger.debug(f"Export data from query took {export_data_from_query_end} s")

        logger.debug("Data from query exported")
    logger.info("... End exporting data from queries")

get_node_id_from_sys_id(sys_id) cached

Get node id from system id (RedisGraph id) :param sys_id: system id :return: node id

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_exporter.py
155
156
157
158
159
160
161
162
163
164
165
@lru_cache
def get_node_id_from_sys_id(self, sys_id: int) -> int:
    """
    Get node id from system id (RedisGraph id)
    :param sys_id: system id
    :return: node id
    """
    if sys_id in self.already_exported_nodes:
        return self.already_exported_nodes[sys_id]
    node_query = "MATCH (n) WHERE ID(n) = $id RETURN n.id"
    return self.mr.query(node_query, params={'id': sys_id}).result_set[0][0]