Skip to content

CosmoTech_Acceleration_Library.Modelops.core.io.model_metadata

ModelMetadata

Bases: RedisHandler

Model Metadata management class for cached data

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class ModelMetadata(RedisHandler):
    """
    Model Metadata management class for cached data
    """

    last_modified_date_key = "lastModifiedDate"
    last_version_key = "lastVersion"
    source_url_key = "adtUrl"
    graph_name_key = "graphName"
    graph_rotation_key = "graphRotation"

    def get_metadata(self) -> dict:
        """
        Get the metadata of the graph
        :return: the dict containing all graph metadata
        """
        return self.r.hgetall(self.metadata_key)

    def get_last_graph_version(self) -> str:
        """
        Get the current last version of the graph
        :return: the graph last version
        """
        return self.get_metadata()[self.last_version_key]

    def get_graph_name(self) -> str:
        """
        Get the graph's name
        :return: the graph's name
        """
        return self.name

    def get_graph_source_url(self) -> str:
        """
        Get the datasource of the graph
        :return: the datasource of the graph
        """
        return self.get_metadata()[self.source_url_key]

    def get_graph_rotation(self) -> str:
        """
        Get the graph rotation of the graph
        :return: the graph rotation of the graph
        """
        return self.get_metadata()[self.graph_rotation_key]

    def get_last_modified_date(self) -> datetime:
        """
        Get the last modified date of the graph
        :return: the last modified date of the graph
        """
        metadata_last_version = self.get_metadata()[self.last_modified_date_key]
        return ModelUtil.convert_str_to_datetime(metadata_last_version)

    def set_all_metadata(self, metadata: dict):
        """
        Set the metadata of the graph
        :param metadata the metadata to set
        :raise Exception if the current version is greater than the new one
        """
        current_metadata = self.get_metadata()
        if self.last_version_key in current_metadata:
            current_version = int(self.get_last_graph_version())
            new_version = int(metadata[self.last_version_key])
            if new_version > current_version:
                logger.debug(f"Metatadata to set : {metadata}")
                self.r.hmset(self.metadata_key, metadata)
            else:
                raise Exception(f"The current version {current_version} is equal or greater than the version to set: {new_version}")
        else:
            logger.debug(f"Metatadata to set : {metadata}")
            self.r.hmset(self.metadata_key, metadata)

    def set_metadata(self,
                     last_graph_version: int,
                     graph_source_url: str,
                     graph_rotation: int) -> dict:
        """
        Set the metadata of the graph
        :param last_graph_version the new version
        :param graph_source_url the source url
        :param graph_rotation the graph rotation
        :return the metadata set
        :raise Exception if the current version is greater than the new one
        """
        metadata = {
            self.last_version_key: str(last_graph_version),
            self.graph_name_key: self.name,
            self.source_url_key: graph_source_url,
            self.graph_rotation_key: str(graph_rotation),
            self.last_modified_date_key: ModelUtil.convert_datetime_to_str(datetime.utcnow())
        }
        logger.debug(f"Metatadata to set : {metadata}")
        self.set_all_metadata(metadata=metadata)

    def set_last_graph_version(self, last_graph_version: int):
        """
        Set the current last version of the graph
        :param last_graph_version the new version
        """
        self.r.hset(self.metadata_key, self.last_version_key, str(last_graph_version))
        logger.debug(f"Graph last_graph_version to set : {str(last_graph_version)}")
        self.update_last_modified_date()

    def set_graph_source_url(self, graph_source_url: str):
        """
        Set the datasource of the graph
        :param graph_source_url the source url
        """
        self.r.hset(self.metadata_key, self.source_url_key, graph_source_url)
        logger.debug(f"Graph source_url to set : {str(graph_source_url)}")
        self.update_last_modified_date()

    def set_graph_rotation(self, graph_rotation: int):
        """
        Set the graph rotation of the graph
        :param graph_rotation the graph rotation
        """
        self.r.hset(self.metadata_key, self.graph_rotation_key, str(graph_rotation))
        logger.debug(f"Graph graph_rotation to set : {str(graph_rotation)}")
        self.update_last_modified_date()

    def update_last_modified_date(self):
        """
        Update the last modified date of the graph
        """
        self.r.hset(self.metadata_key, self.last_modified_date_key, ModelUtil.convert_datetime_to_str(datetime.utcnow()))

    def update_last_version(self):
        """
        Update the last version of the graph
        """
        current_metadata = self.get_metadata()
        if self.last_version_key in current_metadata:
            current_version = int(self.get_last_graph_version())
            new_version = current_version + 1
            self.set_last_graph_version(str(new_version))
            self.update_last_modified_date()
        else:
            self.set_last_graph_version("0")
            self.update_last_modified_date()

get_graph_name()

Get the graph's name :return: the graph's name

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
37
38
39
40
41
42
def get_graph_name(self) -> str:
    """
    Get the graph's name
    :return: the graph's name
    """
    return self.name

get_graph_rotation()

Get the graph rotation of the graph :return: the graph rotation of the graph

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
51
52
53
54
55
56
def get_graph_rotation(self) -> str:
    """
    Get the graph rotation of the graph
    :return: the graph rotation of the graph
    """
    return self.get_metadata()[self.graph_rotation_key]

get_graph_source_url()

Get the datasource of the graph :return: the datasource of the graph

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
44
45
46
47
48
49
def get_graph_source_url(self) -> str:
    """
    Get the datasource of the graph
    :return: the datasource of the graph
    """
    return self.get_metadata()[self.source_url_key]

get_last_graph_version()

Get the current last version of the graph :return: the graph last version

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
30
31
32
33
34
35
def get_last_graph_version(self) -> str:
    """
    Get the current last version of the graph
    :return: the graph last version
    """
    return self.get_metadata()[self.last_version_key]

get_last_modified_date()

Get the last modified date of the graph :return: the last modified date of the graph

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
58
59
60
61
62
63
64
def get_last_modified_date(self) -> datetime:
    """
    Get the last modified date of the graph
    :return: the last modified date of the graph
    """
    metadata_last_version = self.get_metadata()[self.last_modified_date_key]
    return ModelUtil.convert_str_to_datetime(metadata_last_version)

get_metadata()

Get the metadata of the graph :return: the dict containing all graph metadata

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
23
24
25
26
27
28
def get_metadata(self) -> dict:
    """
    Get the metadata of the graph
    :return: the dict containing all graph metadata
    """
    return self.r.hgetall(self.metadata_key)

set_all_metadata(metadata)

Set the metadata of the graph :param metadata the metadata to set :raise Exception if the current version is greater than the new one

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def set_all_metadata(self, metadata: dict):
    """
    Set the metadata of the graph
    :param metadata the metadata to set
    :raise Exception if the current version is greater than the new one
    """
    current_metadata = self.get_metadata()
    if self.last_version_key in current_metadata:
        current_version = int(self.get_last_graph_version())
        new_version = int(metadata[self.last_version_key])
        if new_version > current_version:
            logger.debug(f"Metatadata to set : {metadata}")
            self.r.hmset(self.metadata_key, metadata)
        else:
            raise Exception(f"The current version {current_version} is equal or greater than the version to set: {new_version}")
    else:
        logger.debug(f"Metatadata to set : {metadata}")
        self.r.hmset(self.metadata_key, metadata)

set_graph_rotation(graph_rotation)

Set the graph rotation of the graph :param graph_rotation the graph rotation

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
125
126
127
128
129
130
131
132
def set_graph_rotation(self, graph_rotation: int):
    """
    Set the graph rotation of the graph
    :param graph_rotation the graph rotation
    """
    self.r.hset(self.metadata_key, self.graph_rotation_key, str(graph_rotation))
    logger.debug(f"Graph graph_rotation to set : {str(graph_rotation)}")
    self.update_last_modified_date()

set_graph_source_url(graph_source_url)

Set the datasource of the graph :param graph_source_url the source url

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
116
117
118
119
120
121
122
123
def set_graph_source_url(self, graph_source_url: str):
    """
    Set the datasource of the graph
    :param graph_source_url the source url
    """
    self.r.hset(self.metadata_key, self.source_url_key, graph_source_url)
    logger.debug(f"Graph source_url to set : {str(graph_source_url)}")
    self.update_last_modified_date()

set_last_graph_version(last_graph_version)

Set the current last version of the graph :param last_graph_version the new version

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
107
108
109
110
111
112
113
114
def set_last_graph_version(self, last_graph_version: int):
    """
    Set the current last version of the graph
    :param last_graph_version the new version
    """
    self.r.hset(self.metadata_key, self.last_version_key, str(last_graph_version))
    logger.debug(f"Graph last_graph_version to set : {str(last_graph_version)}")
    self.update_last_modified_date()

set_metadata(last_graph_version, graph_source_url, graph_rotation)

Set the metadata of the graph :param last_graph_version the new version :param graph_source_url the source url :param graph_rotation the graph rotation :return the metadata set :raise Exception if the current version is greater than the new one

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def set_metadata(self,
                 last_graph_version: int,
                 graph_source_url: str,
                 graph_rotation: int) -> dict:
    """
    Set the metadata of the graph
    :param last_graph_version the new version
    :param graph_source_url the source url
    :param graph_rotation the graph rotation
    :return the metadata set
    :raise Exception if the current version is greater than the new one
    """
    metadata = {
        self.last_version_key: str(last_graph_version),
        self.graph_name_key: self.name,
        self.source_url_key: graph_source_url,
        self.graph_rotation_key: str(graph_rotation),
        self.last_modified_date_key: ModelUtil.convert_datetime_to_str(datetime.utcnow())
    }
    logger.debug(f"Metatadata to set : {metadata}")
    self.set_all_metadata(metadata=metadata)

update_last_modified_date()

Update the last modified date of the graph

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
134
135
136
137
138
def update_last_modified_date(self):
    """
    Update the last modified date of the graph
    """
    self.r.hset(self.metadata_key, self.last_modified_date_key, ModelUtil.convert_datetime_to_str(datetime.utcnow()))

update_last_version()

Update the last version of the graph

Source code in CosmoTech_Acceleration_Library/Modelops/core/io/model_metadata.py
140
141
142
143
144
145
146
147
148
149
150
151
152
def update_last_version(self):
    """
    Update the last version of the graph
    """
    current_metadata = self.get_metadata()
    if self.last_version_key in current_metadata:
        current_version = int(self.get_last_graph_version())
        new_version = current_version + 1
        self.set_last_graph_version(str(new_version))
        self.update_last_modified_date()
    else:
        self.set_last_graph_version("0")
        self.update_last_modified_date()