Skip to content

cosmotech.coal.cosmotech_api.twin_data_layer

CSVSourceFile

Source code in cosmotech/coal/cosmotech_api/twin_data_layer.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class CSVSourceFile:
    def __init__(self, file_path: pathlib.Path):
        self.file_path = file_path
        if not file_path.name.endswith(".csv"):
            raise ValueError(T("coal.common.validation.not_csv_file").format(file_path=file_path))
        with open(file_path) as _file:
            dr = DictReader(_file)
            self.fields = list(dr.fieldnames)
        self.object_type = file_path.name[:-4]

        self.id_column = None
        self.source_column = None
        self.target_column = None

        for _c in self.fields:
            if _c.lower() == ID_COLUMN:
                self.id_column = _c
            if _c.lower() == SOURCE_COLUMN:
                self.source_column = _c
            if _c.lower() == TARGET_COLUMN:
                self.target_column = _c

        has_id = self.id_column is not None
        has_source = self.source_column is not None
        has_target = self.target_column is not None

        is_relation = all([has_source, has_target])

        if not has_id and not is_relation:
            LOGGER.error(T("coal.common.validation.invalid_nodes_relations").format(file_path=file_path))
            LOGGER.error(T("coal.common.validation.node_requirements").format(id_column=ID_COLUMN))
            LOGGER.error(
                T("coal.common.validation.relationship_requirements").format(
                    id_column=ID_COLUMN,
                    source_column=SOURCE_COLUMN,
                    target_column=TARGET_COLUMN,
                )
            )
            raise ValueError(T("coal.common.validation.invalid_nodes_relations").format(file_path=file_path))

        self.is_node = has_id and not is_relation

        self.content_fields = {
            _f: _f for _f in self.fields if _f not in [self.id_column, self.source_column, self.target_column]
        }
        if has_id:
            self.content_fields[ID_COLUMN] = self.id_column
        if is_relation:
            self.content_fields[SOURCE_COLUMN] = self.source_column
            self.content_fields[TARGET_COLUMN] = self.target_column

    def reload(self, inplace: bool = False) -> "CSVSourceFile":
        if inplace:
            self.__init__(self.file_path)
            return self
        return CSVSourceFile(self.file_path)

    def generate_query_insert(self) -> str:
        """
        Read a CSV file headers and generate a CREATE cypher query
        :return: the Cypher query for CREATE
        """

        field_names = sorted(self.content_fields.keys(), key=len, reverse=True)

        if self.is_node:
            query = (
                "CREATE (:"
                + self.object_type
                + ", ".join(f"{property_name}: ${self.content_fields[property_name]}" for property_name in field_names)
                + "})"
            )
            # query = ("UNWIND $params AS params " +
            #          f"MERGE (n:{self.object_type}) " +
            #          "SET n += params")
        else:
            query = (
                "MATCH "
                + "(source {"
                + ID_COLUMN
                + ":$"
                + self.source_column
                + "}),\n"
                + "(target {"
                + ID_COLUMN
                + ":$"
                + self.target_column
                + "})\n"
                + "CREATE (source)-[rel:"
                + self.object_type
                + " {"
                + ", ".join(f"{property_name}: ${self.content_fields[property_name]}" for property_name in field_names)
                + "}"
                + "]->(target)\n"
            )
            # query = ("UNWIND $params AS params " +
            #          "MATCH (source {" + ID_COLUMN + ":params." + self.source_column + "})\n" +
            #          "MATCH (target {" + ID_COLUMN + ":params." + self.target_column + "})\n" +
            #          f"CREATE (from) - [rel:{self.object_type}]->(to)" +
            #          "SET rel += params")
        return query

generate_query_insert()

Read a CSV file headers and generate a CREATE cypher query

Returns:

Type Description
str

the Cypher query for CREATE

Source code in cosmotech/coal/cosmotech_api/twin_data_layer.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def generate_query_insert(self) -> str:
    """
    Read a CSV file headers and generate a CREATE cypher query
    :return: the Cypher query for CREATE
    """

    field_names = sorted(self.content_fields.keys(), key=len, reverse=True)

    if self.is_node:
        query = (
            "CREATE (:"
            + self.object_type
            + ", ".join(f"{property_name}: ${self.content_fields[property_name]}" for property_name in field_names)
            + "})"
        )
        # query = ("UNWIND $params AS params " +
        #          f"MERGE (n:{self.object_type}) " +
        #          "SET n += params")
    else:
        query = (
            "MATCH "
            + "(source {"
            + ID_COLUMN
            + ":$"
            + self.source_column
            + "}),\n"
            + "(target {"
            + ID_COLUMN
            + ":$"
            + self.target_column
            + "})\n"
            + "CREATE (source)-[rel:"
            + self.object_type
            + " {"
            + ", ".join(f"{property_name}: ${self.content_fields[property_name]}" for property_name in field_names)
            + "}"
            + "]->(target)\n"
        )
        # query = ("UNWIND $params AS params " +
        #          "MATCH (source {" + ID_COLUMN + ":params." + self.source_column + "})\n" +
        #          "MATCH (target {" + ID_COLUMN + ":params." + self.target_column + "})\n" +
        #          f"CREATE (from) - [rel:{self.object_type}]->(to)" +
        #          "SET rel += params")
    return query