Dump Store data to Azure Blob Storage.
Args:
store_folder: Folder containing the Store
account_name: Azure Storage account name
container_name: Azure Storage container name
tenant_id: Azure tenant ID
client_id: Azure client ID
client_secret: Azure client secret
output_type: Output file type (sqlite, csv, or parquet)
file_prefix: Prefix for uploaded files
Raises:
ValueError: If the output type is invalid
Source code in cosmotech/coal/azure/blob.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 | def dump_store_to_azure(
store_folder: str,
account_name: str,
container_name: str,
tenant_id: str,
client_id: str,
client_secret: str,
output_type: str = "sqlite",
file_prefix: str = "",
) -> None:
"""
Dump Store data to Azure Blob Storage.
Args:
store_folder: Folder containing the Store
account_name: Azure Storage account name
container_name: Azure Storage container name
tenant_id: Azure tenant ID
client_id: Azure client ID
client_secret: Azure client secret
output_type: Output file type (sqlite, csv, or parquet)
file_prefix: Prefix for uploaded files
Raises:
ValueError: If the output type is invalid
"""
_s = Store(store_location=store_folder)
if output_type not in VALID_TYPES:
LOGGER.error(T("coal.common.validation.invalid_output_type").format(output_type=output_type))
raise ValueError(T("coal.common.validation.invalid_output_type").format(output_type=output_type))
container_client = BlobServiceClient(
account_url=f"https://{account_name}.blob.core.windows.net/",
credential=ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret),
).get_container_client(container_name)
def data_upload(data_stream: BytesIO, file_name: str):
uploaded_file_name = file_prefix + file_name
data_stream.seek(0)
size = len(data_stream.read())
data_stream.seek(0)
LOGGER.info(T("coal.common.data_transfer.sending_data").format(size=size))
container_client.upload_blob(name=uploaded_file_name, data=data_stream, length=size, overwrite=True)
if output_type == "sqlite":
_file_path = _s._database_path
_file_name = "db.sqlite"
_uploaded_file_name = file_prefix + _file_name
LOGGER.info(
T("coal.common.data_transfer.file_sent").format(file_path=_file_path, uploaded_name=_uploaded_file_name)
)
with open(_file_path, "rb") as data:
container_client.upload_blob(name=_uploaded_file_name, data=data, overwrite=True)
else:
tables = list(_s.list_tables())
for table_name in tables:
_data_stream = BytesIO()
_file_name = None
_data = _s.get_table(table_name)
if not len(_data):
LOGGER.info(T("coal.common.data_transfer.table_empty").format(table_name=table_name))
continue
if output_type == "csv":
_file_name = table_name + ".csv"
pc.write_csv(_data, _data_stream)
elif output_type == "parquet":
_file_name = table_name + ".parquet"
pq.write_table(_data, _data_stream)
LOGGER.info(
T("coal.common.data_transfer.sending_table").format(table_name=table_name, output_type=output_type)
)
data_upload(_data_stream, _file_name)
|