Skip to content

cosmotech.coal.aws.s3

s3

S3 bucket operations module.

This module provides functions for interacting with S3 buckets, including uploading, downloading, and deleting files.

create_s3_client(endpoint_url, access_id, secret_key, use_ssl=True, ssl_cert_bundle=None)

Create an S3 client with the given credentials and configuration.

Args: endpoint_url: The S3 endpoint URL access_id: The AWS access key ID secret_key: The AWS secret access key use_ssl: Whether to use SSL for the connection ssl_cert_bundle: Path to the SSL certificate bundle

Returns: An S3 client object

Source code in cosmotech/coal/aws/s3.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def create_s3_client(
    endpoint_url: str,
    access_id: str,
    secret_key: str,
    use_ssl: bool = True,
    ssl_cert_bundle: Optional[str] = None,
) -> boto3.client:
    """
    Create an S3 client with the given credentials and configuration.

    Args:
        endpoint_url: The S3 endpoint URL
        access_id: The AWS access key ID
        secret_key: The AWS secret access key
        use_ssl: Whether to use SSL for the connection
        ssl_cert_bundle: Path to the SSL certificate bundle

    Returns:
        An S3 client object
    """
    boto3_parameters = {
        "use_ssl": use_ssl,
        "endpoint_url": endpoint_url,
        "aws_access_key_id": access_id,
        "aws_secret_access_key": secret_key,
    }
    if ssl_cert_bundle:
        boto3_parameters["verify"] = ssl_cert_bundle

    return boto3.client("s3", **boto3_parameters)

create_s3_resource(endpoint_url, access_id, secret_key, use_ssl=True, ssl_cert_bundle=None)

Create an S3 resource with the given credentials and configuration.

Args: endpoint_url: The S3 endpoint URL access_id: The AWS access key ID secret_key: The AWS secret access key use_ssl: Whether to use SSL for the connection ssl_cert_bundle: Path to the SSL certificate bundle

Returns: An S3 resource object

Source code in cosmotech/coal/aws/s3.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def create_s3_resource(
    endpoint_url: str,
    access_id: str,
    secret_key: str,
    use_ssl: bool = True,
    ssl_cert_bundle: Optional[str] = None,
) -> boto3.resource:
    """
    Create an S3 resource with the given credentials and configuration.

    Args:
        endpoint_url: The S3 endpoint URL
        access_id: The AWS access key ID
        secret_key: The AWS secret access key
        use_ssl: Whether to use SSL for the connection
        ssl_cert_bundle: Path to the SSL certificate bundle

    Returns:
        An S3 resource object
    """
    boto3_parameters = {
        "use_ssl": use_ssl,
        "endpoint_url": endpoint_url,
        "aws_access_key_id": access_id,
        "aws_secret_access_key": secret_key,
    }
    if ssl_cert_bundle:
        boto3_parameters["verify"] = ssl_cert_bundle

    return boto3.resource("s3", **boto3_parameters)

delete_objects(bucket_name, s3_resource, file_prefix=None)

Delete objects from an S3 bucket, optionally filtered by prefix.

Args: bucket_name: Name of the S3 bucket s3_resource: S3 resource object file_prefix: Optional prefix to filter objects to delete

Source code in cosmotech/coal/aws/s3.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def delete_objects(
    bucket_name: str,
    s3_resource: boto3.resource,
    file_prefix: Optional[str] = None,
) -> None:
    """
    Delete objects from an S3 bucket, optionally filtered by prefix.

    Args:
        bucket_name: Name of the S3 bucket
        s3_resource: S3 resource object
        file_prefix: Optional prefix to filter objects to delete
    """
    bucket = s3_resource.Bucket(bucket_name)

    if file_prefix:
        bucket_files = bucket.objects.filter(Prefix=file_prefix)
    else:
        bucket_files = bucket.objects.all()

    boto_objects = [{"Key": _file.key} for _file in bucket_files if _file.key != file_prefix]
    if boto_objects:
        LOGGER.info(T("coal.services.azure_storage.deleting_objects").format(objects=boto_objects))
        boto_delete_request = {"Objects": boto_objects}
        bucket.delete_objects(Delete=boto_delete_request)
    else:
        LOGGER.info(T("coal.services.azure_storage.no_objects"))

download_files(target_folder, bucket_name, s3_resource, file_prefix=None)

Download files from an S3 bucket to a local folder.

Args: target_folder: Local folder to download files to bucket_name: Name of the S3 bucket s3_resource: S3 resource object file_prefix: Optional prefix to filter objects to download

Source code in cosmotech/coal/aws/s3.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def download_files(
    target_folder: str,
    bucket_name: str,
    s3_resource: boto3.resource,
    file_prefix: Optional[str] = None,
) -> None:
    """
    Download files from an S3 bucket to a local folder.

    Args:
        target_folder: Local folder to download files to
        bucket_name: Name of the S3 bucket
        s3_resource: S3 resource object
        file_prefix: Optional prefix to filter objects to download
    """
    bucket = s3_resource.Bucket(bucket_name)

    pathlib.Path(target_folder).mkdir(parents=True, exist_ok=True)
    remove_prefix = False
    if file_prefix:
        bucket_files = bucket.objects.filter(Prefix=file_prefix)
        if file_prefix.endswith("/"):
            remove_prefix = True
    else:
        bucket_files = bucket.objects.all()
    for _file in bucket_files:
        if not (path_name := str(_file.key)).endswith("/"):
            target_file = path_name
            if remove_prefix:
                target_file = target_file.removeprefix(file_prefix)
            output_file = f"{target_folder}/{target_file}"
            pathlib.Path(output_file).parent.mkdir(parents=True, exist_ok=True)
            LOGGER.info(T("coal.services.azure_storage.downloading").format(path=path_name, output=output_file))
            bucket.download_file(_file.key, output_file)

upload_data_stream(data_stream, bucket_name, s3_client, file_name, file_prefix='')

Upload a data stream to an S3 bucket.

Args: data_stream: BytesIO stream containing the data to upload bucket_name: Name of the S3 bucket s3_client: S3 client object file_name: Name of the file to create in the bucket file_prefix: Prefix to add to the file name in the bucket

Source code in cosmotech/coal/aws/s3.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def upload_data_stream(
    data_stream: BytesIO,
    bucket_name: str,
    s3_client: boto3.client,
    file_name: str,
    file_prefix: str = "",
) -> None:
    """
    Upload a data stream to an S3 bucket.

    Args:
        data_stream: BytesIO stream containing the data to upload
        bucket_name: Name of the S3 bucket
        s3_client: S3 client object
        file_name: Name of the file to create in the bucket
        file_prefix: Prefix to add to the file name in the bucket
    """
    uploaded_file_name = file_prefix + file_name
    data_stream.seek(0)
    size = len(data_stream.read())
    data_stream.seek(0)

    LOGGER.info(T("coal.common.data_transfer.sending_data").format(size=size))
    s3_client.upload_fileobj(data_stream, bucket_name, uploaded_file_name)

upload_file(file_path, bucket_name, s3_resource, file_prefix='')

Upload a single file to an S3 bucket.

Args: file_path: Path to the file to upload bucket_name: Name of the S3 bucket s3_resource: S3 resource object file_prefix: Prefix to add to the file name in the bucket

Source code in cosmotech/coal/aws/s3.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def upload_file(
    file_path: pathlib.Path,
    bucket_name: str,
    s3_resource: boto3.resource,
    file_prefix: str = "",
) -> None:
    """
    Upload a single file to an S3 bucket.

    Args:
        file_path: Path to the file to upload
        bucket_name: Name of the S3 bucket
        s3_resource: S3 resource object
        file_prefix: Prefix to add to the file name in the bucket
    """
    uploaded_file_name = file_prefix + file_path.name
    LOGGER.info(T("coal.common.data_transfer.file_sent").format(file_path=file_path, uploaded_name=uploaded_file_name))
    s3_resource.Bucket(bucket_name).upload_file(str(file_path), uploaded_file_name)

upload_folder(source_folder, bucket_name, s3_resource, file_prefix='', recursive=False)

Upload files from a folder to an S3 bucket.

Args: source_folder: Path to the folder containing files to upload bucket_name: Name of the S3 bucket s3_resource: S3 resource object file_prefix: Prefix to add to the file names in the bucket recursive: Whether to recursively upload files from subdirectories

Source code in cosmotech/coal/aws/s3.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def upload_folder(
    source_folder: str,
    bucket_name: str,
    s3_resource: boto3.resource,
    file_prefix: str = "",
    recursive: bool = False,
) -> None:
    """
    Upload files from a folder to an S3 bucket.

    Args:
        source_folder: Path to the folder containing files to upload
        bucket_name: Name of the S3 bucket
        s3_resource: S3 resource object
        file_prefix: Prefix to add to the file names in the bucket
        recursive: Whether to recursively upload files from subdirectories
    """
    source_path = pathlib.Path(source_folder)
    if not source_path.exists():
        LOGGER.error(T("coal.common.file_operations.not_found").format(source_folder=source_folder))
        raise FileNotFoundError(T("coal.common.file_operations.not_found").format(source_folder=source_folder))

    if source_path.is_dir():
        _source_name = str(source_path)
        for _file_path in source_path.glob("**/*" if recursive else "*"):
            if _file_path.is_file():
                _file_name = str(_file_path).removeprefix(_source_name).removeprefix("/")
                uploaded_file_name = file_prefix + _file_name
                LOGGER.info(
                    T("coal.common.data_transfer.file_sent").format(
                        file_path=_file_path, uploaded_name=uploaded_file_name
                    )
                )
                s3_resource.Bucket(bucket_name).upload_file(str(_file_path), uploaded_file_name)
    else:
        upload_file(source_path, bucket_name, s3_resource, file_prefix)