Skip to content

cosmotech.coal.azure.adx.tables

tables

check_and_create_table(kusto_client, database, table_name, data)

Check if a table exists and create it if it doesn't.

Args: kusto_client: The Kusto client database: The database name table_name: The table name data: The PyArrow table data

Returns: bool: True if the table was created, False if it already existed

Source code in cosmotech/coal/azure/adx/tables.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def check_and_create_table(kusto_client: KustoClient, database: str, table_name: str, data: pyarrow.Table) -> bool:
    """
    Check if a table exists and create it if it doesn't.

    Args:
        kusto_client: The Kusto client
        database: The database name
        table_name: The table name
        data: The PyArrow table data

    Returns:
        bool: True if the table was created, False if it already existed
    """
    LOGGER.debug(T("coal.services.adx.checking_table_exists"))
    if not table_exists(kusto_client, database, table_name):
        from cosmotech.coal.azure.adx.utils import create_column_mapping

        mapping = create_column_mapping(data)
        LOGGER.debug(T("coal.services.adx.creating_nonexistent_table"))
        create_table(kusto_client, database, table_name, mapping)
        return True
    return False

create_table(client, database, table_name, schema)

Create a table in the database.

Args: client: The KustoClient to use database: The name of the database table_name: The name of the table to create schema: Dictionary mapping column names to ADX types

Returns: bool: True if the table was created successfully, False otherwise

Source code in cosmotech/coal/azure/adx/tables.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def create_table(client: KustoClient, database: str, table_name: str, schema: Dict[str, str]) -> bool:
    """
    Create a table in the database.

    Args:
        client: The KustoClient to use
        database: The name of the database
        table_name: The name of the table to create
        schema: Dictionary mapping column names to ADX types

    Returns:
        bool: True if the table was created successfully, False otherwise
    """
    LOGGER.debug(T("coal.services.adx.creating_table").format(database=database, table_name=table_name))

    create_query = f".create-merge table {table_name}("

    for column_name, column_type in schema.items():
        create_query += f"{column_name}:{column_type},"

    create_query = create_query[:-1] + ")"

    LOGGER.debug(T("coal.services.adx.create_query").format(query=create_query))

    try:
        client.execute(database, create_query)
        LOGGER.info(T("coal.services.adx.table_created").format(table_name=table_name))
        return True
    except Exception as e:
        LOGGER.error(T("coal.services.adx.table_creation_error").format(table_name=table_name, error=str(e)))
        return False

table_exists(client, database, table_name)

Check if a table exists in the database.

Args: client: The KustoClient to use database: The name of the database table_name: The name of the table to check

Returns: bool: True if the table exists, False otherwise

Source code in cosmotech/coal/azure/adx/tables.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def table_exists(client: KustoClient, database: str, table_name: str) -> bool:
    """
    Check if a table exists in the database.

    Args:
        client: The KustoClient to use
        database: The name of the database
        table_name: The name of the table to check

    Returns:
        bool: True if the table exists, False otherwise
    """
    LOGGER.debug(T("coal.services.adx.checking_table").format(database=database, table_name=table_name))

    get_tables_query = f".show database ['{database}'] schema| distinct TableName"
    tables = client.execute(database, get_tables_query)

    for r in tables.primary_results[0]:
        if table_name == r[0]:
            LOGGER.debug(T("coal.services.adx.table_exists").format(table_name=table_name))
            return True

    LOGGER.debug(T("coal.services.adx.table_not_exists").format(table_name=table_name))
    return False