Skip to content

cosmotech.coal.utils.postgresql

postgresql

adapt_table_to_schema(data, target_schema)

Adapt a PyArrow table to match a target schema with detailed logging.

Source code in cosmotech/coal/utils/postgresql.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def adapt_table_to_schema(data: pa.Table, target_schema: pa.Schema) -> pa.Table:
    """
    Adapt a PyArrow table to match a target schema with detailed logging.
    """
    LOGGER.debug(T("coal.services.postgresql.schema_adaptation_start").format(rows=len(data)))
    LOGGER.debug(T("coal.services.postgresql.original_schema").format(schema=data.schema))
    LOGGER.debug(T("coal.services.postgresql.target_schema").format(schema=target_schema))

    target_fields = {field.name: field.type for field in target_schema}
    new_columns = []

    # Track adaptations for summary
    added_columns = []
    dropped_columns = []
    type_conversions = []
    failed_conversions = []

    # Process each field in target schema
    for field_name, target_type in target_fields.items():
        if field_name in data.column_names:
            # Column exists - try to cast to target type
            col = data[field_name]
            original_type = col.type

            if original_type != target_type:
                LOGGER.debug(
                    T("coal.services.postgresql.casting_column").format(
                        field_name=field_name,
                        original_type=original_type,
                        target_type=target_type,
                    )
                )
                try:
                    new_col = pa.compute.cast(col, target_type)
                    new_columns.append(new_col)
                    type_conversions.append(f"{field_name}: {original_type} -> {target_type}")
                except pa.ArrowInvalid as e:
                    LOGGER.warning(
                        T("coal.services.postgresql.cast_failed").format(
                            field_name=field_name,
                            original_type=original_type,
                            target_type=target_type,
                            error=str(e),
                        )
                    )
                    new_columns.append(pa.nulls(len(data), type=target_type))
                    failed_conversions.append(f"{field_name}: {original_type} -> {target_type}")
            else:
                new_columns.append(col)
        else:
            # Column doesn't exist - add nulls
            LOGGER.debug(T("coal.services.postgresql.adding_missing_column").format(field_name=field_name))
            new_columns.append(pa.nulls(len(data), type=target_type))
            added_columns.append(field_name)

    # Log columns that will be dropped
    dropped_columns = [name for name in data.column_names if name not in target_fields]
    if dropped_columns:
        LOGGER.debug(T("coal.services.postgresql.dropping_columns").format(columns=dropped_columns))

    # Create new table
    adapted_table = pa.Table.from_arrays(new_columns, schema=target_schema)

    # Log summary of adaptations
    LOGGER.debug(T("coal.services.postgresql.adaptation_summary"))
    if added_columns:
        LOGGER.debug(T("coal.services.postgresql.added_columns").format(columns=added_columns))
    if dropped_columns:
        LOGGER.debug(T("coal.services.postgresql.dropped_columns").format(columns=dropped_columns))
    if type_conversions:
        LOGGER.debug(T("coal.services.postgresql.successful_conversions").format(conversions=type_conversions))
    if failed_conversions:
        LOGGER.debug(T("coal.services.postgresql.failed_conversions").format(conversions=failed_conversions))

    LOGGER.debug(T("coal.services.postgresql.final_schema").format(schema=adapted_table.schema))
    return adapted_table

get_postgresql_table_schema(target_table_name, postgres_host, postgres_port, postgres_db, postgres_schema, postgres_user, postgres_password, force_encode=False)

Get the schema of an existing PostgreSQL table using SQL queries.

Args: target_table_name: Name of the table postgres_host: PostgreSQL host postgres_port: PostgreSQL port postgres_db: PostgreSQL database name postgres_schema: PostgreSQL schema name postgres_user: PostgreSQL username postgres_password: PostgreSQL password

Returns: PyArrow Schema if table exists, None otherwise

Source code in cosmotech/coal/utils/postgresql.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get_postgresql_table_schema(
    target_table_name: str,
    postgres_host: str,
    postgres_port: str,
    postgres_db: str,
    postgres_schema: str,
    postgres_user: str,
    postgres_password: str,
    force_encode: bool = False,
) -> Optional[pa.Schema]:
    """
    Get the schema of an existing PostgreSQL table using SQL queries.

    Args:
        target_table_name: Name of the table
        postgres_host: PostgreSQL host
        postgres_port: PostgreSQL port
        postgres_db: PostgreSQL database name
        postgres_schema: PostgreSQL schema name
        postgres_user: PostgreSQL username
        postgres_password: PostgreSQL password

    Returns:
        PyArrow Schema if table exists, None otherwise
    """
    LOGGER.debug(
        T("coal.services.postgresql.getting_schema").format(
            postgres_schema=postgres_schema, target_table_name=target_table_name
        )
    )

    postgresql_full_uri = generate_postgresql_full_uri(
        postgres_host,
        postgres_port,
        postgres_db,
        postgres_user,
        postgres_password,
        force_encode,
    )

    with dbapi.connect(postgresql_full_uri) as conn:
        try:
            return conn.adbc_get_table_schema(
                target_table_name,
                db_schema_filter=postgres_schema,
            )
        except adbc_driver_manager.ProgrammingError:
            LOGGER.warning(
                T("coal.services.postgresql.table_not_found").format(
                    postgres_schema=postgres_schema, target_table_name=target_table_name
                )
            )
        return None