Skip to content

CosmoTech_Acceleration_Library.Modelops.core.common.writer.CsvWriter

CsvWriter

Csv Writer class

Source code in CosmoTech_Acceleration_Library/Modelops/core/common/writer/CsvWriter.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class CsvWriter:
    """
    Csv Writer class
    """

    @staticmethod
    def _to_csv_format(val: any) -> str:
        if isinstance(val, bool):
            return str(val).lower()
        if isinstance(val, dict):
            return json.dumps(val)
        if str(val) == 'True' or str(val) == 'False':
            return str(val).lower()
        if str(val).startswith('{') and str(val).endswith('}'):
            try:
                return json.dumps(json.loads(val))
            except json.decoder.JSONDecodeError:
                return json.dumps(ast.literal_eval(str(val)))
        return str(val)

    @staticmethod
    def _to_cosmo_key(val: any) -> str:
        if str(val) == ModelUtil.dt_id_key:
            return ModelUtil.id_key
        return val

    @staticmethod
    def write_twin_data(export_dir: str, file_name: str, query_result: QueryResult,
                        delimiter: str = ',', quote_char: str = '\"') -> None:
        headers = set()
        rows = []
        for raw_data in query_result.result_set:
            row = {}
            # read all graph link properties
            for i in range(len(raw_data)):  # TODO for the moment its only a len 1 list with the node
                row.update({CsvWriter._to_cosmo_key(k): CsvWriter._to_csv_format(v) for k, v in raw_data[i].properties.items()})
            headers.update(row.keys())
            rows.append(row)

        output_file_name = f'{export_dir}/{file_name}.csv'
        logger.debug(f"Writing CSV file {output_file_name}")
        with open(output_file_name, 'w') as csvfile:
            csv_writer = csv.DictWriter(csvfile, fieldnames=headers, delimiter=delimiter, quotechar=quote_char, quoting=csv.QUOTE_MINIMAL)
            csv_writer.writeheader()
            csv_writer.writerows(rows)
        logger.debug(f"... CSV file {output_file_name} has been written")

    @staticmethod
    def write_relationship_data(export_dir: str, file_name: str, query_result: QueryResult, headers: list = [],
                                delimiter: str = ',', quote_char: str = '\"') -> None:
        headers = {'source', 'target'}
        rows = []
        for raw_data in query_result.result_set:
            row = {'source': raw_data[0], 'target': raw_data[1]}
            row.update({k: CsvWriter._to_csv_format(v) for k, v in raw_data[2].properties.items()})
            headers.update(row.keys())
            rows.append(row)

        output_file_name = export_dir + file_name + '.csv'
        logger.debug(f"Writing CSV file {output_file_name}")
        with open(output_file_name, 'w') as csvfile:
            csv_writer = csv.DictWriter(csvfile, fieldnames=headers, delimiter=delimiter, quotechar=quote_char, quoting=csv.QUOTE_MINIMAL)
            csv_writer.writeheader()
            csv_writer.writerows(rows)
        logger.debug(f"... CSV file {output_file_name} has been written")