Skip to content

CosmoTech_Acceleration_Library.Modelops.core.common.writer.CsvWriter

CsvWriter

Csv Writer class

Source code in CosmoTech_Acceleration_Library/Modelops/core/common/writer/CsvWriter.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class CsvWriter:
    """
    Csv Writer class
    """

    @staticmethod
    def _to_csv_format(val: any) -> str:
        if isinstance(val, bool):
            return str(val).lower()
        if isinstance(val, dict):
            return json.dumps(val)
        if str(val) == 'True' or str(val) == 'False':
            return str(val).lower()
        if str(val).startswith('{') and str(val).endswith('}'):
            try:
                return json.dumps(json.loads(val))
            except json.decoder.JSONDecodeError:
                return json.dumps(ast.literal_eval(str(val)))
        return str(val)

    @staticmethod
    def _to_cosmo_key(val: any) -> str:
        if str(val) == ModelUtil.dt_id_key:
            return ModelUtil.id_key
        return val

    @staticmethod
    def write_twin_data(export_dir: str,
                        file_name: str,
                        query_result: QueryResult,
                        delimiter: str = ',',
                        quote_char: str = '\"') -> None:
        headers = set()
        rows = []
        for raw_data in query_result.result_set:
            row = {}
            # read all graph link properties
            for i in range(len(raw_data)):  # TODO for the moment its only a len 1 list with the node
                row.update({
                    CsvWriter._to_cosmo_key(k): CsvWriter._to_csv_format(v)
                    for k, v in raw_data[i].properties.items()
                })
            headers.update(row.keys())
            rows.append(row)

        output_file_name = f'{export_dir}/{file_name}.csv'
        logger.debug(f"Writing CSV file {output_file_name}")
        with open(output_file_name, 'w') as csvfile:
            csv_writer = csv.DictWriter(csvfile,
                                        fieldnames=headers,
                                        delimiter=delimiter,
                                        quotechar=quote_char,
                                        quoting=csv.QUOTE_MINIMAL)
            csv_writer.writeheader()
            csv_writer.writerows(rows)
        logger.debug(f"... CSV file {output_file_name} has been written")

    @staticmethod
    def write_relationship_data(export_dir: str,
                                file_name: str,
                                query_result: QueryResult,
                                headers: list = [],
                                delimiter: str = ',',
                                quote_char: str = '\"') -> None:
        headers = {'source', 'target'}
        rows = []
        for raw_data in query_result.result_set:
            row = {'source': raw_data[0], 'target': raw_data[1]}
            row.update({k: CsvWriter._to_csv_format(v) for k, v in raw_data[2].properties.items()})
            headers.update(row.keys())
            rows.append(row)

        output_file_name = f'{export_dir}/{file_name}.csv'
        logger.debug(f"Writing CSV file {output_file_name}")
        with open(output_file_name, 'w') as csvfile:
            csv_writer = csv.DictWriter(csvfile,
                                        fieldnames=headers,
                                        delimiter=delimiter,
                                        quotechar=quote_char,
                                        quoting=csv.QUOTE_MINIMAL)
            csv_writer.writeheader()
            csv_writer.writerows(rows)
        logger.debug(f"... CSV file {output_file_name} has been written")

    @staticmethod
    def write_data(export_dir: str,
                   file_name: str,
                   input_rows: dict,
                   delimiter: str = ',',
                   quote_char: str = '\"') -> None:
        output_file_name = export_dir + file_name + '.csv'
        write_header = False
        if not os.path.exists(output_file_name):
            write_header = True

        headers = set()
        output_rows = []
        for row in input_rows:
            output_rows.append({CsvWriter._to_cosmo_key(k): CsvWriter._to_csv_format(v) for k, v in row.items()})
            headers.update(row.keys())

        logger.info(f"Writing file {output_file_name} ...")
        with open(output_file_name, 'a') as csvfile:
            csv_writer = csv.DictWriter(csvfile,
                                        fieldnames=headers,
                                        delimiter=delimiter,
                                        quotechar=quote_char,
                                        quoting=csv.QUOTE_MINIMAL)
            if write_header:
                csv_writer.writeheader()
            csv_writer.writerows(output_rows)
        logger.debug(f"... file {output_file_name} has been written")