Skip to content

cosmotech.coal.azure.blob

blob

Azure Blob Storage operations module.

This module provides functions for interacting with Azure Blob Storage, including uploading data from the Store.

dump_store_to_azure(store_folder, account_name, container_name, tenant_id, client_id, client_secret, output_type='sqlite', file_prefix='')

Dump Store data to Azure Blob Storage.

Args: store_folder: Folder containing the Store account_name: Azure Storage account name container_name: Azure Storage container name tenant_id: Azure tenant ID client_id: Azure client ID client_secret: Azure client secret output_type: Output file type (sqlite, csv, or parquet) file_prefix: Prefix for uploaded files

Raises: ValueError: If the output type is invalid

Source code in cosmotech/coal/azure/blob.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def dump_store_to_azure(
    store_folder: str,
    account_name: str,
    container_name: str,
    tenant_id: str,
    client_id: str,
    client_secret: str,
    output_type: str = "sqlite",
    file_prefix: str = "",
) -> None:
    """
    Dump Store data to Azure Blob Storage.

    Args:
        store_folder: Folder containing the Store
        account_name: Azure Storage account name
        container_name: Azure Storage container name
        tenant_id: Azure tenant ID
        client_id: Azure client ID
        client_secret: Azure client secret
        output_type: Output file type (sqlite, csv, or parquet)
        file_prefix: Prefix for uploaded files

    Raises:
        ValueError: If the output type is invalid
    """
    _s = Store(store_location=store_folder)

    if output_type not in VALID_TYPES:
        LOGGER.error(T("coal.common.validation.invalid_output_type").format(output_type=output_type))
        raise ValueError(T("coal.common.validation.invalid_output_type").format(output_type=output_type))

    container_client = BlobServiceClient(
        account_url=f"https://{account_name}.blob.core.windows.net/",
        credential=ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret),
    ).get_container_client(container_name)

    def data_upload(data_stream: BytesIO, file_name: str):
        uploaded_file_name = file_prefix + file_name
        data_stream.seek(0)
        size = len(data_stream.read())
        data_stream.seek(0)

        LOGGER.info(T("coal.common.data_transfer.sending_data").format(size=size))
        container_client.upload_blob(name=uploaded_file_name, data=data_stream, length=size, overwrite=True)

    if output_type == "sqlite":
        _file_path = _s._database_path
        _file_name = "db.sqlite"
        _uploaded_file_name = file_prefix + _file_name
        LOGGER.info(
            T("coal.common.data_transfer.file_sent").format(file_path=_file_path, uploaded_name=_uploaded_file_name)
        )
        with open(_file_path, "rb") as data:
            container_client.upload_blob(name=_uploaded_file_name, data=data, overwrite=True)
    else:
        tables = list(_s.list_tables())
        for table_name in tables:
            _data_stream = BytesIO()
            _file_name = None
            _data = _s.get_table(table_name)
            if not len(_data):
                LOGGER.info(T("coal.common.data_transfer.table_empty").format(table_name=table_name))
                continue
            if output_type == "csv":
                _file_name = table_name + ".csv"
                pc.write_csv(_data, _data_stream)
            elif output_type == "parquet":
                _file_name = table_name + ".parquet"
                pq.write_table(_data, _data_stream)
            LOGGER.info(
                T("coal.common.data_transfer.sending_table").format(table_name=table_name, output_type=output_type)
            )
            data_upload(_data_stream, _file_name)