Working with the CosmoTech API
Objective
- Understand how to authenticate and connect to the CosmoTech API
- Learn to work with workspaces for file management
- Master the Twin Data Layer for graph data operations
- Implement runner and run data management
- Build complete workflows integrating multiple API features
Introduction to the CosmoTech API Integration
The CosmoTech Acceleration Library (CoAL) provides a comprehensive set of tools for interacting with the CosmoTech API. This integration allows you to:
- Authenticate with different identity providers
- Manage workspaces and files
- Work with the Twin Data Layer for graph data
- Handle runners and runs
- Process and transform data
- Build end-to-end workflows
The API integration is organized into several modules, each focused on specific functionality:
- connection: Authentication and API client management
- workspace: Workspace file operations
- twin_data_layer: Graph data management
- runner: Runner and run data operations
API vs CLI
While the csm-data
CLI provides command-line tools for many common operations, the direct API integration offers more flexibility and programmatic control. Use the API integration when you need to:
- Build custom workflows
- Integrate with other Python code
- Perform complex operations not covered by the CLI
- Implement real-time interactions with the platform
Authentication and Connection
The first step in working with the CosmoTech API is establishing a connection. CoAL supports multiple authentication methods:
- API Key authentication
- Azure Entra (formerly Azure AD) authentication
- Keycloak authentication
The get_api_client()
function automatically detects which authentication method to use based on the environment variables you've set.
Basic connection setup |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | # Example: Setting up connections to the CosmoTech API
import os
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.utils.logger import LOGGER
# Method 1: Using API Key (set these environment variables before running)
os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key" # Replace with your actual API key
# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")
# Use the client with various API instances
from cosmotech_api.api.organization_api import OrganizationApi
org_api = OrganizationApi(api_client)
# List organizations
organizations = org_api.find_all_organizations()
for org in organizations:
print(f"Organization: {org.name} (ID: {org.id})")
# Don't forget to close the client when done
api_client.close()
# Method 2: Using Azure Entra (set these environment variables before running)
"""
os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URL
os.environ["CSM_API_SCOPE"] = "api://your-app-id/.default" # Replace with your API scope
os.environ["AZURE_CLIENT_ID"] = "your-client-id" # Replace with your client ID
os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" # Replace with your client secret
os.environ["AZURE_TENANT_ID"] = "your-tenant-id" # Replace with your tenant ID
# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")
# Use the client with various API instances
# ...
# Don't forget to close the client when done
api_client.close()
"""
# Method 3: Using Keycloak (set these environment variables before running)
"""
os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URL
os.environ["IDP_BASE_URL"] = "https://keycloak.example.com/auth/" # Replace with your Keycloak URL
os.environ["IDP_TENANT_ID"] = "your-realm" # Replace with your realm
os.environ["IDP_CLIENT_ID"] = "your-client-id" # Replace with your client ID
os.environ["IDP_CLIENT_SECRET"] = "your-client-secret" # Replace with your client secret
# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")
# Use the client with various API instances
# ...
# Don't forget to close the client when done
api_client.close()
"""
|
Environment Variables
You can set environment variables in your code for testing, but in production environments, it's better to set them at the system or container level for security.
API Key Authentication
API Key authentication is the simplest method and requires two environment variables:
CSM_API_URL
: The URL of the CosmoTech API
CSM_API_KEY
: Your API key
Azure Entra Authentication
Azure Entra authentication uses service principal credentials and requires these environment variables:
CSM_API_URL
: The URL of the CosmoTech API
CSM_API_SCOPE
: The API scope (usually in the format api://app-id/.default
)
AZURE_CLIENT_ID
: Your client ID
AZURE_CLIENT_SECRET
: Your client secret
AZURE_TENANT_ID
: Your tenant ID
Keycloak Authentication
Keycloak authentication requires these environment variables:
CSM_API_URL
: The URL of the CosmoTech API
IDP_BASE_URL
: The base URL of your Keycloak server
IDP_TENANT_ID
: Your realm name
IDP_CLIENT_ID
: Your client ID
IDP_CLIENT_SECRET
: Your client secret
API Client Lifecycle
Always close the API client when you're done using it to release resources. The best practice is to use a try
/finally
block to ensure the client is closed even if an error occurs.
Working with Workspaces
Workspaces in the CosmoTech platform provide a way to organize and share files. The CoAL library offers functions for listing, downloading, and uploading files in workspaces.
Workspace operations |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | # Example: Working with workspaces in the CosmoTech API
import os
import pathlib
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.workspace import (
list_workspace_files,
download_workspace_file,
upload_workspace_file,
)
from cosmotech.coal.utils.logger import LOGGER
# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key" # Replace with your actual API key
# Organization and workspace IDs
organization_id = "your-organization-id" # Replace with your organization ID
workspace_id = "your-workspace-id" # Replace with your workspace ID
# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")
try:
# Example 1: List files in a workspace with a specific prefix
file_prefix = "data/" # List files in the "data" directory
try:
files = list_workspace_files(api_client, organization_id, workspace_id, file_prefix)
print(f"Files in workspace with prefix '{file_prefix}':")
for file in files:
print(f" - {file}")
except ValueError as e:
print(f"Error listing files: {e}")
# Example 2: Download a file from the workspace
file_to_download = "data/sample.csv" # Replace with an actual file in your workspace
target_directory = pathlib.Path("./downloaded_files")
target_directory.mkdir(exist_ok=True, parents=True)
try:
downloaded_file = download_workspace_file(
api_client, organization_id, workspace_id, file_to_download, target_directory
)
print(f"Downloaded file to: {downloaded_file}")
except Exception as e:
print(f"Error downloading file: {e}")
# Example 3: Upload a file to the workspace
file_to_upload = "./local_data/upload_sample.csv" # Replace with a local file path
workspace_destination = "data/uploaded/" # Destination in the workspace (ending with / to keep filename)
try:
uploaded_file = upload_workspace_file(
api_client,
organization_id,
workspace_id,
file_to_upload,
workspace_destination,
overwrite=True, # Set to False to prevent overwriting existing files
)
print(f"Uploaded file as: {uploaded_file}")
except Exception as e:
print(f"Error uploading file: {e}")
finally:
# Always close the API client when done
api_client.close()
|
Listing Files
The list_workspace_files
function allows you to list files in a workspace with a specific prefix:
files = list_workspace_files(api_client, organization_id, workspace_id, file_prefix)
This is useful for finding files in a specific directory or with a specific naming pattern.
Downloading Files
The download_workspace_file
function downloads a file from the workspace to a local directory:
downloaded_file = download_workspace_file(
api_client,
organization_id,
workspace_id,
file_to_download,
target_directory
)
If the file is in a subdirectory in the workspace, the function will create the necessary local subdirectories.
Uploading Files
The upload_workspace_file
function uploads a local file to the workspace:
uploaded_file = upload_workspace_file(
api_client,
organization_id,
workspace_id,
file_to_upload,
workspace_destination,
overwrite=True
)
The workspace_destination
parameter can be:
- A specific file path in the workspace
- A directory path ending with /
, in which case the original filename is preserved
Workspace Paths
When working with workspace paths:
- Use forward slashes (
/
) regardless of your operating system
- End directory paths with a trailing slash (
/
)
- Use relative paths from the workspace root
Twin Data Layer Operations
The Twin Data Layer (TDL) is a graph database that stores nodes and relationships. CoAL provides tools for working with the TDL, particularly for preparing and sending CSV data.
Twin Data Layer operations |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 | # Example: Working with the Twin Data Layer in the CosmoTech API
import os
import pathlib
import csv
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.twin_data_layer import CSVSourceFile
from cosmotech_api.api.twin_graph_api import TwinGraphApi
from cosmotech.coal.utils.logger import LOGGER
# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key" # Replace with your actual API key
# Organization and workspace IDs
organization_id = "your-organization-id" # Replace with your organization ID
workspace_id = "your-workspace-id" # Replace with your workspace ID
twin_graph_id = "your-twin-graph-id" # Replace with your twin graph ID
# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")
try:
# Create a TwinGraphApi instance
twin_graph_api = TwinGraphApi(api_client)
# Example 1: Create sample CSV files for nodes and relationships
# Create a directory for our sample data
data_dir = pathlib.Path("./tdl_sample_data")
data_dir.mkdir(exist_ok=True, parents=True)
# Create a sample nodes CSV file (Person nodes)
persons_file = data_dir / "Person.csv"
with open(persons_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["id", "name", "age", "city"])
writer.writerow(["p1", "Alice", "30", "New York"])
writer.writerow(["p2", "Bob", "25", "San Francisco"])
writer.writerow(["p3", "Charlie", "35", "Chicago"])
# Create a sample relationships CSV file (KNOWS relationships)
knows_file = data_dir / "KNOWS.csv"
with open(knows_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["src", "dest", "since"])
writer.writerow(["p1", "p2", "2020"])
writer.writerow(["p2", "p3", "2021"])
writer.writerow(["p3", "p1", "2019"])
print(f"Created sample CSV files in {data_dir}")
# Example 2: Parse CSV files and generate Cypher queries
# Parse the nodes CSV file
person_csv = CSVSourceFile(persons_file)
print(f"Parsed {person_csv.object_type} CSV file:")
print(f" Is node: {person_csv.is_node}")
print(f" Fields: {person_csv.fields}")
print(f" ID column: {person_csv.id_column}")
# Generate a Cypher query for creating nodes
person_query = person_csv.generate_query_insert()
print(f"\nGenerated Cypher query for {person_csv.object_type}:")
print(person_query)
# Parse the relationships CSV file
knows_csv = CSVSourceFile(knows_file)
print(f"\nParsed {knows_csv.object_type} CSV file:")
print(f" Is node: {knows_csv.is_node}")
print(f" Fields: {knows_csv.fields}")
print(f" Source column: {knows_csv.source_column}")
print(f" Target column: {knows_csv.target_column}")
# Generate a Cypher query for creating relationships
knows_query = knows_csv.generate_query_insert()
print(f"\nGenerated Cypher query for {knows_csv.object_type}:")
print(knows_query)
# Example 3: Send data to the Twin Data Layer (commented out as it requires an actual twin graph)
"""
# For nodes, you would typically:
with open(persons_file, "r") as f:
reader = csv.DictReader(f)
for row in reader:
# Create parameters for the Cypher query
params = {k: v for k, v in row.items()}
# Execute the query
twin_graph_api.run_twin_graph_cypher_query(
organization_id=organization_id,
workspace_id=workspace_id,
twin_graph_id=twin_graph_id,
twin_graph_cypher_query={
"query": person_query,
"parameters": params
}
)
# For relationships, you would typically:
with open(knows_file, "r") as f:
reader = csv.DictReader(f)
for row in reader:
# Create parameters for the Cypher query
params = {k: v for k, v in row.items()}
# Execute the query
twin_graph_api.run_twin_graph_cypher_query(
organization_id=organization_id,
workspace_id=workspace_id,
twin_graph_id=twin_graph_id,
twin_graph_cypher_query={
"query": knows_query,
"parameters": params
}
)
"""
# Example 4: Query data from the Twin Data Layer (commented out as it requires an actual twin graph)
"""
# Execute a Cypher query to get all Person nodes
result = twin_graph_api.run_twin_graph_cypher_query(
organization_id=organization_id,
workspace_id=workspace_id,
twin_graph_id=twin_graph_id,
twin_graph_cypher_query={
"query": "MATCH (p:Person) RETURN p.id, p.name, p.age, p.city",
"parameters": {}
}
)
# Process the results
print("\nPerson nodes in the Twin Data Layer:")
for record in result.records:
print(f" - {record}")
"""
finally:
# Always close the API client when done
api_client.close()
|
The TDL expects CSV files in a specific format:
- Node files: Must have an
id
column and can have additional property columns
- Relationship files: Must have
src
and dest
columns and can have additional property columns
The filename (without the .csv
extension) becomes the node label or relationship type in the graph.
Parsing CSV Files
The CSVSourceFile
class helps parse CSV files and determine if they represent nodes or relationships:
csv_file = CSVSourceFile(file_path)
print(f"Is node: {csv_file.is_node}")
print(f"Fields: {csv_file.fields}")
Generating Cypher Queries
The generate_query_insert
method creates Cypher queries for inserting data into the TDL:
query = csv_file.generate_query_insert()
These queries can then be executed using the TwinGraphApi:
twin_graph_api.run_twin_graph_cypher_query(
organization_id=organization_id,
workspace_id=workspace_id,
twin_graph_id=twin_graph_id,
twin_graph_cypher_query={
"query": query,
"parameters": params
}
)
Node References
When creating relationships, make sure the nodes referenced by the src
and dest
columns already exist in the graph. Otherwise, the relationship creation will fail.
Runner and Run Management
Runners and runs are central concepts in the CosmoTech platform. CoAL provides functions for working with runner data, parameters, and associated datasets.
Runner operations |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 | # Example: Working with runners and runs in the CosmoTech API
import os
import pathlib
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.runner import (
get_runner_data,
get_runner_parameters,
download_runner_data,
download_datasets,
)
from cosmotech.coal.utils.logger import LOGGER
# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key" # Replace with your actual API key
# Organization, workspace, and runner IDs
organization_id = "your-organization-id" # Replace with your organization ID
workspace_id = "your-workspace-id" # Replace with your workspace ID
runner_id = "your-runner-id" # Replace with your runner ID
# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")
try:
# Example 1: Get runner data
runner_data = get_runner_data(organization_id, workspace_id, runner_id)
print(f"Runner name: {runner_data.name}")
print(f"Runner ID: {runner_data.id}")
print(f"Runner state: {runner_data.state}")
# Example 2: Get runner parameters
parameters = get_runner_parameters(runner_data)
print("\nRunner parameters:")
for param in parameters:
print(f" - {param['parameterId']}: {param['value']} (type: {param['varType']})")
# Example 3: Download runner data (parameters and datasets)
# Create directories for parameters and datasets
param_dir = pathlib.Path("./runner_parameters")
dataset_dir = pathlib.Path("./runner_datasets")
param_dir.mkdir(exist_ok=True, parents=True)
dataset_dir.mkdir(exist_ok=True, parents=True)
# Download runner data
result = download_runner_data(
organization_id=organization_id,
workspace_id=workspace_id,
runner_id=runner_id,
parameter_folder=str(param_dir),
dataset_folder=str(dataset_dir),
read_files=True, # Read file contents
parallel=True, # Download datasets in parallel
write_json=True, # Write parameters as JSON
write_csv=True, # Write parameters as CSV
fetch_dataset=True, # Fetch datasets
)
print("\nDownloaded runner data:")
print(f" - Parameters saved to: {param_dir}")
print(f" - Datasets saved to: {dataset_dir}")
# Example 4: Working with specific datasets
if result["datasets"]:
print("\nDatasets associated with the runner:")
for dataset_id, dataset_info in result["datasets"].items():
print(f" - Dataset ID: {dataset_id}")
print(f" Name: {dataset_info.get('name', 'N/A')}")
# List files in the dataset
if "files" in dataset_info:
print(f" Files:")
for file_info in dataset_info["files"]:
print(f" - {file_info.get('name', 'N/A')}")
else:
print("\nNo datasets associated with this runner.")
# Example 5: Download specific datasets
"""
from cosmotech.coal.cosmotech_api.runner import get_dataset_ids_from_runner
# Get dataset IDs from the runner
dataset_ids = get_dataset_ids_from_runner(runner_data)
if dataset_ids:
# Create a directory for the datasets
specific_dataset_dir = pathlib.Path("./specific_datasets")
specific_dataset_dir.mkdir(exist_ok=True, parents=True)
# Download the datasets
datasets = download_datasets(
organization_id=organization_id,
workspace_id=workspace_id,
dataset_ids=dataset_ids,
read_files=True,
parallel=True,
)
print("\nDownloaded specific datasets:")
for dataset_id, dataset_info in datasets.items():
print(f" - Dataset ID: {dataset_id}")
print(f" Name: {dataset_info.get('name', 'N/A')}")
"""
finally:
# Always close the API client when done
api_client.close()
|
Getting Runner Data
The get_runner_data
function retrieves information about a runner:
runner_data = get_runner_data(organization_id, workspace_id, runner_id)
Working with Parameters
The get_runner_parameters
function extracts parameters from runner data:
parameters = get_runner_parameters(runner_data)
Downloading Runner Data
The download_runner_data
function downloads all data associated with a runner, including parameters and datasets:
result = download_runner_data(
organization_id=organization_id,
workspace_id=workspace_id,
runner_id=runner_id,
parameter_folder=str(param_dir),
dataset_folder=str(dataset_dir),
write_json=True,
write_csv=True,
fetch_dataset=True,
)
This function:
- Downloads parameters and writes them as JSON and/or CSV files
- Downloads associated datasets
- Organizes everything in the specified directories
Dataset References
Runners can reference datasets in two ways:
- Through parameters with the
%DATASETID%
variable type
- Through the
dataset_list
property
The download_runner_data
function handles both types of references.
Complete Workflow Example
Putting it all together, here's a complete workflow that demonstrates how to use the CosmoTech API for a data processing pipeline:
Complete workflow |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246 | # Example: Complete workflow using the CosmoTech API
import os
import pathlib
import json
import csv
from cosmotech.coal.cosmotech_api.connection import get_api_client
from cosmotech.coal.cosmotech_api.runner import (
get_runner_data,
download_runner_data,
)
from cosmotech.coal.cosmotech_api.workspace import (
list_workspace_files,
download_workspace_file,
upload_workspace_file,
)
from cosmotech.coal.cosmotech_api.twin_data_layer import CSVSourceFile
from cosmotech_api.api.twin_graph_api import TwinGraphApi
from cosmotech_api.api.dataset_api import DatasetApi
from cosmotech.coal.utils.logger import LOGGER
# Set up environment variables for authentication
os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URL
os.environ["CSM_API_KEY"] = "your-api-key" # Replace with your actual API key
# Organization, workspace, and runner IDs
organization_id = "your-organization-id" # Replace with your organization ID
workspace_id = "your-workspace-id" # Replace with your workspace ID
runner_id = "your-runner-id" # Replace with your runner ID
twin_graph_id = "your-twin-graph-id" # Replace with your twin graph ID
# Create directories for our workflow
workflow_dir = pathlib.Path("./workflow_example")
workflow_dir.mkdir(exist_ok=True, parents=True)
input_dir = workflow_dir / "input"
processed_dir = workflow_dir / "processed"
output_dir = workflow_dir / "output"
input_dir.mkdir(exist_ok=True, parents=True)
processed_dir.mkdir(exist_ok=True, parents=True)
output_dir.mkdir(exist_ok=True, parents=True)
# Get the API client
api_client, connection_type = get_api_client()
LOGGER.info(f"Connected using: {connection_type}")
try:
# Step 1: Download runner data (parameters and datasets)
print("\n=== Step 1: Download Runner Data ===")
runner_data = get_runner_data(organization_id, workspace_id, runner_id)
print(f"Runner name: {runner_data.name}")
result = download_runner_data(
organization_id=organization_id,
workspace_id=workspace_id,
runner_id=runner_id,
parameter_folder=str(input_dir / "parameters"),
dataset_folder=str(input_dir / "datasets"),
write_json=True,
write_csv=True,
)
print(f"Downloaded runner data to {input_dir}")
# Step 2: Process the data
print("\n=== Step 2: Process Data ===")
# For this example, we'll create a simple transformation:
# - Read a CSV file from the input
# - Transform it
# - Write the result to the processed directory
# Let's assume we have a "customers.csv" file in the input directory
customers_file = input_dir / "datasets" / "customers.csv"
# If the file doesn't exist for this example, create a sample one
if not customers_file.exists():
print("Creating sample customers.csv file for demonstration")
customers_file.parent.mkdir(exist_ok=True, parents=True)
with open(customers_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["id", "name", "age", "city", "spending"])
writer.writerow(["c1", "Alice", "30", "New York", "1500"])
writer.writerow(["c2", "Bob", "25", "San Francisco", "2000"])
writer.writerow(["c3", "Charlie", "35", "Chicago", "1200"])
# Read the customers data
customers = []
with open(customers_file, "r") as f:
reader = csv.DictReader(f)
for row in reader:
customers.append(row)
print(f"Read {len(customers)} customers from {customers_file}")
# Process the data: calculate a loyalty score based on age and spending
for customer in customers:
age = int(customer["age"])
spending = int(customer["spending"])
# Simple formula: loyalty score = spending / 100 + (age - 20) / 10
loyalty_score = round(spending / 100 + (age - 20) / 10, 1)
customer["loyalty_score"] = str(loyalty_score)
# Write the processed data
processed_file = processed_dir / "customers_with_loyalty.csv"
with open(processed_file, "w", newline="") as f:
fieldnames = ["id", "name", "age", "city", "spending", "loyalty_score"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(customers)
print(f"Processed data written to {processed_file}")
# Step 3: Upload the processed file to the workspace
print("\n=== Step 3: Upload Processed Data to Workspace ===")
try:
uploaded_file = upload_workspace_file(
api_client,
organization_id,
workspace_id,
str(processed_file),
"processed_data/", # Destination in the workspace
overwrite=True,
)
print(f"Uploaded processed file as: {uploaded_file}")
except Exception as e:
print(f"Error uploading file: {e}")
# Step 4: Create a dataset from the processed data
print("\n=== Step 4: Create Dataset from Processed Data ===")
# This step would typically involve:
# 1. Creating a dataset in the CosmoTech API
# 2. Uploading files to the dataset
"""
# Create a dataset
dataset_api = DatasetApi(api_client)
new_dataset = {
"name": "Customers with Loyalty Scores",
"description": "Processed customer data with calculated loyalty scores",
"tags": ["processed", "customers", "loyalty"]
}
try:
dataset = dataset_api.create_dataset(
organization_id=organization_id,
workspace_id=workspace_id,
dataset=new_dataset
)
dataset_id = dataset.id
print(f"Created dataset with ID: {dataset_id}")
# Upload the processed file to the dataset
# This would typically involve additional API calls
# ...
except Exception as e:
print(f"Error creating dataset: {e}")
"""
# Step 5: Send data to the Twin Data Layer
print("\n=== Step 5: Send Data to Twin Data Layer ===")
# Parse the processed CSV file for the Twin Data Layer
customer_csv = CSVSourceFile(processed_file)
# Generate a Cypher query for creating nodes
customer_query = customer_csv.generate_query_insert()
print(f"Generated Cypher query for Customer nodes:")
print(customer_query)
# In a real scenario, you would send this data to the Twin Data Layer
"""
twin_graph_api = TwinGraphApi(api_client)
# For each customer, create a node in the Twin Data Layer
with open(processed_file, "r") as f:
reader = csv.DictReader(f)
for row in reader:
# Create parameters for the Cypher query
params = {k: v for k, v in row.items()}
# Execute the query
twin_graph_api.run_twin_graph_cypher_query(
organization_id=organization_id,
workspace_id=workspace_id,
twin_graph_id=twin_graph_id,
twin_graph_cypher_query={
"query": customer_query,
"parameters": params
}
)
"""
# Step 6: Generate a report
print("\n=== Step 6: Generate Report ===")
# Calculate some statistics
total_customers = len(customers)
avg_age = sum(int(c["age"]) for c in customers) / total_customers
avg_spending = sum(int(c["spending"]) for c in customers) / total_customers
avg_loyalty = sum(float(c["loyalty_score"]) for c in customers) / total_customers
# Create a report
report = {
"report_date": "2025-02-28",
"runner_id": runner_id,
"statistics": {
"total_customers": total_customers,
"average_age": round(avg_age, 1),
"average_spending": round(avg_spending, 2),
"average_loyalty_score": round(avg_loyalty, 1),
},
"top_customers": sorted(customers, key=lambda c: float(c["loyalty_score"]), reverse=True)[
:2
], # Top 2 customers by loyalty score
}
# Write the report to a JSON file
report_file = output_dir / "customer_report.json"
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
print(f"Report generated and saved to {report_file}")
# Print a summary of the report
print("\nReport Summary:")
print(f"Total Customers: {report['statistics']['total_customers']}")
print(f"Average Age: {report['statistics']['average_age']}")
print(f"Average Spending: {report['statistics']['average_spending']}")
print(f"Average Loyalty Score: {report['statistics']['average_loyalty_score']}")
print("\nTop Customers by Loyalty Score:")
for i, customer in enumerate(report["top_customers"], 1):
print(f"{i}. {customer['name']} (Score: {customer['loyalty_score']})")
print("\nWorkflow completed successfully!")
finally:
# Always close the API client when done
api_client.close()
|
This workflow:
- Downloads runner data (parameters and datasets)
- Processes the data (calculates loyalty scores for customers)
- Uploads the processed data to the workspace
- Prepares the data for the Twin Data Layer
- Generates a report with statistics and insights
Real-world Workflows
In real-world scenarios, you might:
- Use more complex data transformations
- Integrate with external systems
- Implement error handling and retries
- Add logging and monitoring
- Parallelize operations for better performance
Best Practices and Tips
Authentication
- Use environment variables for credentials
- Implement proper secret management in production
- Always close API clients when done
Error Handling
try:
# API operations
except cosmotech_api.exceptions.ApiException as e:
# Handle API errors
print(f"API error: {e.status} - {e.reason}")
except Exception as e:
# Handle other errors
print(f"Error: {e}")
finally:
# Always close the client
api_client.close()
- Download datasets in parallel when possible (
parallel=True
)
- Batch operations when sending multiple items to the API
- Use appropriate error handling and retries for network operations
Security
- Never hardcode credentials in your code
- Use the principle of least privilege for API keys and service principals
- Validate and sanitize inputs before sending them to the API
Conclusion
The CosmoTech API integration in CoAL provides a powerful way to interact with the CosmoTech platform programmatically. By leveraging these capabilities, you can:
- Automate workflows
- Integrate with other systems
- Build custom applications
- Process and analyze data
- Create end-to-end solutions
Whether you're building data pipelines, creating custom interfaces, or integrating with existing systems, the CoAL library's API integration offers the tools you need to work effectively with the CosmoTech platform.