Skip to content

Working with the CosmoTech API

Objective

  • Understand how to authenticate and connect to the CosmoTech API
  • Learn to work with workspaces for file management
  • Master the Twin Data Layer for graph data operations
  • Implement runner and run data management
  • Build complete workflows integrating multiple API features

Introduction to the CosmoTech API Integration

The CosmoTech Acceleration Library (CoAL) provides a comprehensive set of tools for interacting with the CosmoTech API. This integration allows you to:

  • Authenticate with different identity providers
  • Manage workspaces and files
  • Work with the Twin Data Layer for graph data
  • Handle runners and runs
  • Process and transform data
  • Build end-to-end workflows

The API integration is organized into several modules, each focused on specific functionality:

  • connection: Authentication and API client management
  • workspace: Workspace file operations
  • twin_data_layer: Graph data management
  • runner: Runner and run data operations

API vs CLI

While the csm-data CLI provides command-line tools for many common operations, the direct API integration offers more flexibility and programmatic control. Use the API integration when you need to:

  • Build custom workflows
  • Integrate with other Python code
  • Perform complex operations not covered by the CLI
  • Implement real-time interactions with the platform

Authentication and Connection

The first step in working with the CosmoTech API is establishing a connection. CoAL supports multiple authentication methods:

  • API Key authentication
  • Azure Entra (formerly Azure AD) authentication
  • Keycloak authentication

The get_api_client() function automatically detects which authentication method to use based on the environment variables you've set.

Basic connection setup
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# Example: Setting up connections to the CosmoTech API
import os
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.utils.logger import LOGGER

# Method 1: Using API Key (set these environment variables before running)
os.environ["CSM_API_URL"] = "https://api.cosmotech.com"  # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key"  # Replace with your actual API key

# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")

# Use the client with various API instances
from cosmotech_api.api.organization_api import OrganizationApi

org_api = OrganizationApi(api_client)

# List organizations
organizations = org_api.find_all_organizations()
for org in organizations:
    print(f"Organization: {org.name} (ID: {org.id})")

# Don't forget to close the client when done
api_client.close()

# Method 2: Using Azure Entra (set these environment variables before running)
"""
os.environ["CSM_API_URL"] = "https://api.cosmotech.com"  # Replace with your API URL
os.environ["CSM_API_SCOPE"] = "api://your-app-id/.default"  # Replace with your API scope
os.environ["AZURE_CLIENT_ID"] = "your-client-id"  # Replace with your client ID
os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret"  # Replace with your client secret
os.environ["AZURE_TENANT_ID"] = "your-tenant-id"  # Replace with your tenant ID

# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")

# Use the client with various API instances
# ...

# Don't forget to close the client when done
api_client.close()
"""

# Method 3: Using Keycloak (set these environment variables before running)
"""
os.environ["CSM_API_URL"] = "https://api.cosmotech.com"  # Replace with your API URL
os.environ["IDP_BASE_URL"] = "https://keycloak.example.com/auth/"  # Replace with your Keycloak URL
os.environ["IDP_TENANT_ID"] = "your-realm"  # Replace with your realm
os.environ["IDP_CLIENT_ID"] = "your-client-id"  # Replace with your client ID
os.environ["IDP_CLIENT_SECRET"] = "your-client-secret"  # Replace with your client secret

# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")

# Use the client with various API instances
# ...

# Don't forget to close the client when done
api_client.close()
"""

Environment Variables

You can set environment variables in your code for testing, but in production environments, it's better to set them at the system or container level for security.

API Key Authentication

API Key authentication is the simplest method and requires two environment variables:

  • CSM_API_URL: The URL of the CosmoTech API
  • CSM_API_KEY: Your API key

Azure Entra Authentication

Azure Entra authentication uses service principal credentials and requires these environment variables:

  • CSM_API_URL: The URL of the CosmoTech API
  • CSM_API_SCOPE: The API scope (usually in the format api://app-id/.default)
  • AZURE_CLIENT_ID: Your client ID
  • AZURE_CLIENT_SECRET: Your client secret
  • AZURE_TENANT_ID: Your tenant ID

Keycloak Authentication

Keycloak authentication requires these environment variables:

  • CSM_API_URL: The URL of the CosmoTech API
  • IDP_BASE_URL: The base URL of your Keycloak server
  • IDP_TENANT_ID: Your realm name
  • IDP_CLIENT_ID: Your client ID
  • IDP_CLIENT_SECRET: Your client secret

API Client Lifecycle

Always close the API client when you're done using it to release resources. The best practice is to use a try/finally block to ensure the client is closed even if an error occurs.

Working with Workspaces

Workspaces in the CosmoTech platform provide a way to organize and share files. The CoAL library offers functions for listing, downloading, and uploading files in workspaces.

Workspace operations
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Example: Working with workspaces in the CosmoTech API
import os
import pathlib
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.workspace import (
    list_workspace_files,
    download_workspace_file,
    upload_workspace_file,
)
from cosmotech.coal.utils.logger import LOGGER

# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com"  # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key"  # Replace with your actual API key

# Organization and workspace IDs
organization_id = "your-organization-id"  # Replace with your organization ID
workspace_id = "your-workspace-id"  # Replace with your workspace ID

# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")

try:
    # Example 1: List files in a workspace with a specific prefix
    file_prefix = "data/"  # List files in the "data" directory
    try:
        files = list_workspace_files(api_client, organization_id, workspace_id, file_prefix)
        print(f"Files in workspace with prefix '{file_prefix}':")
        for file in files:
            print(f"  - {file}")
    except ValueError as e:
        print(f"Error listing files: {e}")

    # Example 2: Download a file from the workspace
    file_to_download = "data/sample.csv"  # Replace with an actual file in your workspace
    target_directory = pathlib.Path("./downloaded_files")
    target_directory.mkdir(exist_ok=True, parents=True)

    try:
        downloaded_file = download_workspace_file(
            api_client, organization_id, workspace_id, file_to_download, target_directory
        )
        print(f"Downloaded file to: {downloaded_file}")
    except Exception as e:
        print(f"Error downloading file: {e}")

    # Example 3: Upload a file to the workspace
    file_to_upload = "./local_data/upload_sample.csv"  # Replace with a local file path
    workspace_destination = "data/uploaded/"  # Destination in the workspace (ending with / to keep filename)

    try:
        uploaded_file = upload_workspace_file(
            api_client,
            organization_id,
            workspace_id,
            file_to_upload,
            workspace_destination,
            overwrite=True,  # Set to False to prevent overwriting existing files
        )
        print(f"Uploaded file as: {uploaded_file}")
    except Exception as e:
        print(f"Error uploading file: {e}")

finally:
    # Always close the API client when done
    api_client.close()

Listing Files

The list_workspace_files function allows you to list files in a workspace with a specific prefix:

files = list_workspace_files(api_client, organization_id, workspace_id, file_prefix)

This is useful for finding files in a specific directory or with a specific naming pattern.

Downloading Files

The download_workspace_file function downloads a file from the workspace to a local directory:

downloaded_file = download_workspace_file(
    api_client, 
    organization_id, 
    workspace_id, 
    file_to_download, 
    target_directory
)

If the file is in a subdirectory in the workspace, the function will create the necessary local subdirectories.

Uploading Files

The upload_workspace_file function uploads a local file to the workspace:

uploaded_file = upload_workspace_file(
    api_client,
    organization_id,
    workspace_id,
    file_to_upload,
    workspace_destination,
    overwrite=True
)

The workspace_destination parameter can be: - A specific file path in the workspace - A directory path ending with /, in which case the original filename is preserved

Workspace Paths

When working with workspace paths:

  • Use forward slashes (/) regardless of your operating system
  • End directory paths with a trailing slash (/)
  • Use relative paths from the workspace root

Twin Data Layer Operations

The Twin Data Layer (TDL) is a graph database that stores nodes and relationships. CoAL provides tools for working with the TDL, particularly for preparing and sending CSV data.

Twin Data Layer operations
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Example: Working with the Twin Data Layer in the CosmoTech API
import os
import pathlib
import csv
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.twin_data_layer import CSVSourceFile
from cosmotech_api.api.twin_graph_api import TwinGraphApi
from cosmotech.coal.utils.logger import LOGGER

# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com"  # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key"  # Replace with your actual API key

# Organization and workspace IDs
organization_id = "your-organization-id"  # Replace with your organization ID
workspace_id = "your-workspace-id"  # Replace with your workspace ID
twin_graph_id = "your-twin-graph-id"  # Replace with your twin graph ID

# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")

try:
    # Create a TwinGraphApi instance
    twin_graph_api = TwinGraphApi(api_client)

    # Example 1: Create sample CSV files for nodes and relationships

    # Create a directory for our sample data
    data_dir = pathlib.Path("./tdl_sample_data")
    data_dir.mkdir(exist_ok=True, parents=True)

    # Create a sample nodes CSV file (Person nodes)
    persons_file = data_dir / "Person.csv"
    with open(persons_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["id", "name", "age", "city"])
        writer.writerow(["p1", "Alice", "30", "New York"])
        writer.writerow(["p2", "Bob", "25", "San Francisco"])
        writer.writerow(["p3", "Charlie", "35", "Chicago"])

    # Create a sample relationships CSV file (KNOWS relationships)
    knows_file = data_dir / "KNOWS.csv"
    with open(knows_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["src", "dest", "since"])
        writer.writerow(["p1", "p2", "2020"])
        writer.writerow(["p2", "p3", "2021"])
        writer.writerow(["p3", "p1", "2019"])

    print(f"Created sample CSV files in {data_dir}")

    # Example 2: Parse CSV files and generate Cypher queries

    # Parse the nodes CSV file
    person_csv = CSVSourceFile(persons_file)
    print(f"Parsed {person_csv.object_type} CSV file:")
    print(f"  Is node: {person_csv.is_node}")
    print(f"  Fields: {person_csv.fields}")
    print(f"  ID column: {person_csv.id_column}")

    # Generate a Cypher query for creating nodes
    person_query = person_csv.generate_query_insert()
    print(f"\nGenerated Cypher query for {person_csv.object_type}:")
    print(person_query)

    # Parse the relationships CSV file
    knows_csv = CSVSourceFile(knows_file)
    print(f"\nParsed {knows_csv.object_type} CSV file:")
    print(f"  Is node: {knows_csv.is_node}")
    print(f"  Fields: {knows_csv.fields}")
    print(f"  Source column: {knows_csv.source_column}")
    print(f"  Target column: {knows_csv.target_column}")

    # Generate a Cypher query for creating relationships
    knows_query = knows_csv.generate_query_insert()
    print(f"\nGenerated Cypher query for {knows_csv.object_type}:")
    print(knows_query)

    # Example 3: Send data to the Twin Data Layer (commented out as it requires an actual twin graph)
    """
    # For nodes, you would typically:
    with open(persons_file, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            # Create parameters for the Cypher query
            params = {k: v for k, v in row.items()}

            # Execute the query
            twin_graph_api.run_twin_graph_cypher_query(
                organization_id=organization_id,
                workspace_id=workspace_id,
                twin_graph_id=twin_graph_id,
                twin_graph_cypher_query={
                    "query": person_query,
                    "parameters": params
                }
            )

    # For relationships, you would typically:
    with open(knows_file, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            # Create parameters for the Cypher query
            params = {k: v for k, v in row.items()}

            # Execute the query
            twin_graph_api.run_twin_graph_cypher_query(
                organization_id=organization_id,
                workspace_id=workspace_id,
                twin_graph_id=twin_graph_id,
                twin_graph_cypher_query={
                    "query": knows_query,
                    "parameters": params
                }
            )
    """

    # Example 4: Query data from the Twin Data Layer (commented out as it requires an actual twin graph)
    """
    # Execute a Cypher query to get all Person nodes
    result = twin_graph_api.run_twin_graph_cypher_query(
        organization_id=organization_id,
        workspace_id=workspace_id,
        twin_graph_id=twin_graph_id,
        twin_graph_cypher_query={
            "query": "MATCH (p:Person) RETURN p.id, p.name, p.age, p.city",
            "parameters": {}
        }
    )

    # Process the results
    print("\nPerson nodes in the Twin Data Layer:")
    for record in result.records:
        print(f"  - {record}")
    """

finally:
    # Always close the API client when done
    api_client.close()

CSV File Format

The TDL expects CSV files in a specific format:

  • Node files: Must have an id column and can have additional property columns
  • Relationship files: Must have src and dest columns and can have additional property columns

The filename (without the .csv extension) becomes the node label or relationship type in the graph.

Parsing CSV Files

The CSVSourceFile class helps parse CSV files and determine if they represent nodes or relationships:

csv_file = CSVSourceFile(file_path)
print(f"Is node: {csv_file.is_node}")
print(f"Fields: {csv_file.fields}")

Generating Cypher Queries

The generate_query_insert method creates Cypher queries for inserting data into the TDL:

query = csv_file.generate_query_insert()

These queries can then be executed using the TwinGraphApi:

twin_graph_api.run_twin_graph_cypher_query(
    organization_id=organization_id,
    workspace_id=workspace_id,
    twin_graph_id=twin_graph_id,
    twin_graph_cypher_query={
        "query": query,
        "parameters": params
    }
)

Node References

When creating relationships, make sure the nodes referenced by the src and dest columns already exist in the graph. Otherwise, the relationship creation will fail.

Runner and Run Management

Runners and runs are central concepts in the CosmoTech platform. CoAL provides functions for working with runner data, parameters, and associated datasets.

Runner operations
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
# Example: Working with runners and runs in the CosmoTech API
import os
import pathlib
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.runner import (
    get_runner_data,
    get_runner_parameters,
    download_runner_data,
    download_datasets,
)
from cosmotech.coal.utils.logger import LOGGER

# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com"  # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key"  # Replace with your actual API key

# Organization, workspace, and runner IDs
organization_id = "your-organization-id"  # Replace with your organization ID
workspace_id = "your-workspace-id"  # Replace with your workspace ID
runner_id = "your-runner-id"  # Replace with your runner ID

# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")

try:
    # Example 1: Get runner data
    runner_data = get_runner_data(organization_id, workspace_id, runner_id)
    print(f"Runner name: {runner_data.name}")
    print(f"Runner ID: {runner_data.id}")
    print(f"Runner state: {runner_data.state}")

    # Example 2: Get runner parameters
    parameters = get_runner_parameters(runner_data)
    print("\nRunner parameters:")
    for param in parameters:
        print(f"  - {param['parameterId']}: {param['value']} (type: {param['varType']})")

    # Example 3: Download runner data (parameters and datasets)
    # Create directories for parameters and datasets
    param_dir = pathlib.Path("./runner_parameters")
    dataset_dir = pathlib.Path("./runner_datasets")
    param_dir.mkdir(exist_ok=True, parents=True)
    dataset_dir.mkdir(exist_ok=True, parents=True)

    # Download runner data
    result = download_runner_data(
        organization_id=organization_id,
        workspace_id=workspace_id,
        runner_id=runner_id,
        parameter_folder=str(param_dir),
        dataset_folder=str(dataset_dir),
        read_files=True,  # Read file contents
        parallel=True,  # Download datasets in parallel
        write_json=True,  # Write parameters as JSON
        write_csv=True,  # Write parameters as CSV
        fetch_dataset=True,  # Fetch datasets
    )

    print("\nDownloaded runner data:")
    print(f"  - Parameters saved to: {param_dir}")
    print(f"  - Datasets saved to: {dataset_dir}")

    # Example 4: Working with specific datasets
    if result["datasets"]:
        print("\nDatasets associated with the runner:")
        for dataset_id, dataset_info in result["datasets"].items():
            print(f"  - Dataset ID: {dataset_id}")
            print(f"    Name: {dataset_info.get('name', 'N/A')}")

            # List files in the dataset
            if "files" in dataset_info:
                print(f"    Files:")
                for file_info in dataset_info["files"]:
                    print(f"      - {file_info.get('name', 'N/A')}")
    else:
        print("\nNo datasets associated with this runner.")

    # Example 5: Download specific datasets
    """
    from cosmotech.coal.cosmotech_api.runner import get_dataset_ids_from_runner

    # Get dataset IDs from the runner
    dataset_ids = get_dataset_ids_from_runner(runner_data)

    if dataset_ids:
        # Create a directory for the datasets
        specific_dataset_dir = pathlib.Path("./specific_datasets")
        specific_dataset_dir.mkdir(exist_ok=True, parents=True)

        # Download the datasets
        datasets = download_datasets(
            organization_id=organization_id,
            workspace_id=workspace_id,
            dataset_ids=dataset_ids,
            read_files=True,
            parallel=True,
        )

        print("\nDownloaded specific datasets:")
        for dataset_id, dataset_info in datasets.items():
            print(f"  - Dataset ID: {dataset_id}")
            print(f"    Name: {dataset_info.get('name', 'N/A')}")
    """

finally:
    # Always close the API client when done
    api_client.close()

Getting Runner Data

The get_runner_data function retrieves information about a runner:

runner_data = get_runner_data(organization_id, workspace_id, runner_id)

Working with Parameters

The get_runner_parameters function extracts parameters from runner data:

parameters = get_runner_parameters(runner_data)

Downloading Runner Data

The download_runner_data function downloads all data associated with a runner, including parameters and datasets:

result = download_runner_data(
    organization_id=organization_id,
    workspace_id=workspace_id,
    runner_id=runner_id,
    parameter_folder=str(param_dir),
    dataset_folder=str(dataset_dir),
    write_json=True,
    write_csv=True,
    fetch_dataset=True,
)

This function: - Downloads parameters and writes them as JSON and/or CSV files - Downloads associated datasets - Organizes everything in the specified directories

Dataset References

Runners can reference datasets in two ways:

  • Through parameters with the %DATASETID% variable type
  • Through the dataset_list property

The download_runner_data function handles both types of references.

Complete Workflow Example

Putting it all together, here's a complete workflow that demonstrates how to use the CosmoTech API for a data processing pipeline:

Complete workflow
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# Example: Complete workflow using the CosmoTech API
import os
import pathlib
import json
import csv
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.runner import (
    get_runner_data,
    download_runner_data,
)
from cosmotech.coal.cosmotech_api.workspace import (
    list_workspace_files,
    download_workspace_file,
    upload_workspace_file,
)
from cosmotech.coal.cosmotech_api.twin_data_layer import CSVSourceFile
from cosmotech_api.api.twin_graph_api import TwinGraphApi
from cosmotech_api.api.dataset_api import DatasetApi
from cosmotech.coal.utils.logger import LOGGER

# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com"  # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key"  # Replace with your actual API key

# Organization, workspace, and runner IDs
organization_id = "your-organization-id"  # Replace with your organization ID
workspace_id = "your-workspace-id"  # Replace with your workspace ID
runner_id = "your-runner-id"  # Replace with your runner ID
twin_graph_id = "your-twin-graph-id"  # Replace with your twin graph ID

# Create directories for our workflow
workflow_dir = pathlib.Path("./workflow_example")
workflow_dir.mkdir(exist_ok=True, parents=True)

input_dir = workflow_dir / "input"
processed_dir = workflow_dir / "processed"
output_dir = workflow_dir / "output"

input_dir.mkdir(exist_ok=True, parents=True)
processed_dir.mkdir(exist_ok=True, parents=True)
output_dir.mkdir(exist_ok=True, parents=True)

# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")

try:
    # Step 1: Download runner data (parameters and datasets)
    print("\n=== Step 1: Download Runner Data ===")

    runner_data = get_runner_data(organization_id, workspace_id, runner_id)
    print(f"Runner name: {runner_data.name}")

    result = download_runner_data(
        organization_id=organization_id,
        workspace_id=workspace_id,
        runner_id=runner_id,
        parameter_folder=str(input_dir / "parameters"),
        dataset_folder=str(input_dir / "datasets"),
        write_json=True,
        write_csv=True,
    )

    print(f"Downloaded runner data to {input_dir}")

    # Step 2: Process the data
    print("\n=== Step 2: Process Data ===")

    # For this example, we'll create a simple transformation:
    # - Read a CSV file from the input
    # - Transform it
    # - Write the result to the processed directory

    # Let's assume we have a "customers.csv" file in the input directory
    customers_file = input_dir / "datasets" / "customers.csv"

    # If the file doesn't exist for this example, create a sample one
    if not customers_file.exists():
        print("Creating sample customers.csv file for demonstration")
        customers_file.parent.mkdir(exist_ok=True, parents=True)
        with open(customers_file, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["id", "name", "age", "city", "spending"])
            writer.writerow(["c1", "Alice", "30", "New York", "1500"])
            writer.writerow(["c2", "Bob", "25", "San Francisco", "2000"])
            writer.writerow(["c3", "Charlie", "35", "Chicago", "1200"])

    # Read the customers data
    customers = []
    with open(customers_file, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            customers.append(row)

    print(f"Read {len(customers)} customers from {customers_file}")

    # Process the data: calculate a loyalty score based on age and spending
    for customer in customers:
        age = int(customer["age"])
        spending = int(customer["spending"])

        # Simple formula: loyalty score = spending / 100 + (age - 20) / 10
        loyalty_score = round(spending / 100 + (age - 20) / 10, 1)
        customer["loyalty_score"] = str(loyalty_score)

    # Write the processed data
    processed_file = processed_dir / "customers_with_loyalty.csv"
    with open(processed_file, "w", newline="") as f:
        fieldnames = ["id", "name", "age", "city", "spending", "loyalty_score"]
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(customers)

    print(f"Processed data written to {processed_file}")

    # Step 3: Upload the processed file to the workspace
    print("\n=== Step 3: Upload Processed Data to Workspace ===")

    try:
        uploaded_file = upload_workspace_file(
            api_client,
            organization_id,
            workspace_id,
            str(processed_file),
            "processed_data/",  # Destination in the workspace
            overwrite=True,
        )
        print(f"Uploaded processed file as: {uploaded_file}")
    except Exception as e:
        print(f"Error uploading file: {e}")

    # Step 4: Create a dataset from the processed data
    print("\n=== Step 4: Create Dataset from Processed Data ===")

    # This step would typically involve:
    # 1. Creating a dataset in the CosmoTech API
    # 2. Uploading files to the dataset

    """
    # Create a dataset
    dataset_api = DatasetApi(api_client)

    new_dataset = {
        "name": "Customers with Loyalty Scores",
        "description": "Processed customer data with calculated loyalty scores",
        "tags": ["processed", "customers", "loyalty"]
    }

    try:
        dataset = dataset_api.create_dataset(
            organization_id=organization_id,
            workspace_id=workspace_id,
            dataset=new_dataset
        )

        dataset_id = dataset.id
        print(f"Created dataset with ID: {dataset_id}")

        # Upload the processed file to the dataset
        # This would typically involve additional API calls
        # ...

    except Exception as e:
        print(f"Error creating dataset: {e}")
    """

    # Step 5: Send data to the Twin Data Layer
    print("\n=== Step 5: Send Data to Twin Data Layer ===")

    # Parse the processed CSV file for the Twin Data Layer
    customer_csv = CSVSourceFile(processed_file)

    # Generate a Cypher query for creating nodes
    customer_query = customer_csv.generate_query_insert()
    print(f"Generated Cypher query for Customer nodes:")
    print(customer_query)

    # In a real scenario, you would send this data to the Twin Data Layer
    """
    twin_graph_api = TwinGraphApi(api_client)

    # For each customer, create a node in the Twin Data Layer
    with open(processed_file, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            # Create parameters for the Cypher query
            params = {k: v for k, v in row.items()}

            # Execute the query
            twin_graph_api.run_twin_graph_cypher_query(
                organization_id=organization_id,
                workspace_id=workspace_id,
                twin_graph_id=twin_graph_id,
                twin_graph_cypher_query={
                    "query": customer_query,
                    "parameters": params
                }
            )
    """

    # Step 6: Generate a report
    print("\n=== Step 6: Generate Report ===")

    # Calculate some statistics
    total_customers = len(customers)
    avg_age = sum(int(c["age"]) for c in customers) / total_customers
    avg_spending = sum(int(c["spending"]) for c in customers) / total_customers
    avg_loyalty = sum(float(c["loyalty_score"]) for c in customers) / total_customers

    # Create a report
    report = {
        "report_date": "2025-02-28",
        "runner_id": runner_id,
        "statistics": {
            "total_customers": total_customers,
            "average_age": round(avg_age, 1),
            "average_spending": round(avg_spending, 2),
            "average_loyalty_score": round(avg_loyalty, 1),
        },
        "top_customers": sorted(customers, key=lambda c: float(c["loyalty_score"]), reverse=True)[
            :2
        ],  # Top 2 customers by loyalty score
    }

    # Write the report to a JSON file
    report_file = output_dir / "customer_report.json"
    with open(report_file, "w") as f:
        json.dump(report, f, indent=2)

    print(f"Report generated and saved to {report_file}")

    # Print a summary of the report
    print("\nReport Summary:")
    print(f"Total Customers: {report['statistics']['total_customers']}")
    print(f"Average Age: {report['statistics']['average_age']}")
    print(f"Average Spending: {report['statistics']['average_spending']}")
    print(f"Average Loyalty Score: {report['statistics']['average_loyalty_score']}")
    print("\nTop Customers by Loyalty Score:")
    for i, customer in enumerate(report["top_customers"], 1):
        print(f"{i}. {customer['name']} (Score: {customer['loyalty_score']})")

    print("\nWorkflow completed successfully!")

finally:
    # Always close the API client when done
    api_client.close()

This workflow:

  1. Downloads runner data (parameters and datasets)
  2. Processes the data (calculates loyalty scores for customers)
  3. Uploads the processed data to the workspace
  4. Prepares the data for the Twin Data Layer
  5. Generates a report with statistics and insights

Real-world Workflows

In real-world scenarios, you might:

  • Use more complex data transformations
  • Integrate with external systems
  • Implement error handling and retries
  • Add logging and monitoring
  • Parallelize operations for better performance

Best Practices and Tips

Authentication

  • Use environment variables for credentials
  • Implement proper secret management in production
  • Always close API clients when done

Error Handling

try:
    # API operations
except cosmotech_api.exceptions.ApiException as e:
    # Handle API errors
    print(f"API error: {e.status} - {e.reason}")
except Exception as e:
    # Handle other errors
    print(f"Error: {e}")
finally:
    # Always close the client
    api_client.close()

Performance Considerations

  • Download datasets in parallel when possible (parallel=True)
  • Batch operations when sending multiple items to the API
  • Use appropriate error handling and retries for network operations

Security

  • Never hardcode credentials in your code
  • Use the principle of least privilege for API keys and service principals
  • Validate and sanitize inputs before sending them to the API

Conclusion

The CosmoTech API integration in CoAL provides a powerful way to interact with the CosmoTech platform programmatically. By leveraging these capabilities, you can:

  • Automate workflows
  • Integrate with other systems
  • Build custom applications
  • Process and analyze data
  • Create end-to-end solutions

Whether you're building data pipelines, creating custom interfaces, or integrating with existing systems, the CoAL library's API integration offers the tools you need to work effectively with the CosmoTech platform.