The CosmoTech Acceleration Library (CoAL) provides a comprehensive set of tools for interacting with the CosmoTech API. This integration allows you to:
Authenticate with different identity providers
Manage workspaces and files
Work with the Twin Data Layer for graph data
Handle runners and runs
Process and transform data
Build end-to-end workflows
The API integration is organized into several modules, each focused on specific functionality:
connection: Authentication and API client management
workspace: Workspace file operations
twin_data_layer: Graph data management
runner: Runner and run data operations
API vs CLI
While the csm-data CLI provides command-line tools for many common operations, the direct API integration offers more flexibility and programmatic control. Use the API integration when you need to:
Build custom workflows
Integrate with other Python code
Perform complex operations not covered by the CLI
Implement real-time interactions with the platform
# Example: Setting up connections to the CosmoTech APIimportosfromcosmotech.coal.cosmotech_api.connectionimportget_api_clientfromcosmotech.coal.utils.loggerimportLOGGER# Method 1: Using API Key (set these environment variables before running)os.environ["CSM_API_URL"]="https://api.cosmotech.com"# Replace with your API URLos.environ["CSM_API_KEY"]="your-api-key"# Replace with your actual API key# Get the API clientapi_client,connection_type=get_api_client()LOGGER.info(f"Connected using: {connection_type}")# Use the client with various API instancesfromcosmotech_api.api.organization_apiimportOrganizationApiorg_api=OrganizationApi(api_client)# List organizationsorganizations=org_api.find_all_organizations()fororginorganizations:print(f"Organization: {org.name} (ID: {org.id})")# Don't forget to close the client when doneapi_client.close()# Method 2: Using Azure Entra (set these environment variables before running)"""os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URLos.environ["CSM_API_SCOPE"] = "api://your-app-id/.default" # Replace with your API scopeos.environ["AZURE_CLIENT_ID"] = "your-client-id" # Replace with your client IDos.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" # Replace with your client secretos.environ["AZURE_TENANT_ID"] = "your-tenant-id" # Replace with your tenant ID# Get the API clientapi_client, connection_type = get_api_client()LOGGER.info(f"Connected using: {connection_type}")# Use the client with various API instances# ...# Don't forget to close the client when doneapi_client.close()"""# Method 3: Using Keycloak (set these environment variables before running)"""os.environ["CSM_API_URL"] = "https://api.cosmotech.com" # Replace with your API URLos.environ["IDP_BASE_URL"] = "https://keycloak.example.com/auth/" # Replace with your Keycloak URLos.environ["IDP_TENANT_ID"] = "your-realm" # Replace with your realmos.environ["IDP_CLIENT_ID"] = "your-client-id" # Replace with your client IDos.environ["IDP_CLIENT_SECRET"] = "your-client-secret" # Replace with your client secret# Get the API clientapi_client, connection_type = get_api_client()LOGGER.info(f"Connected using: {connection_type}")# Use the client with various API instances# ...# Don't forget to close the client when doneapi_client.close()"""
Environment Variables
You can set environment variables in your code for testing, but in production environments, it's better to set them at the system or container level for security.
Keycloak authentication requires these environment variables:
CSM_API_URL: The URL of the CosmoTech API
IDP_BASE_URL: The base URL of your Keycloak server
IDP_TENANT_ID: Your realm name
IDP_CLIENT_ID: Your client ID
IDP_CLIENT_SECRET: Your client secret
API Client Lifecycle
Always close the API client when you're done using it to release resources. The best practice is to use a try/finally block to ensure the client is closed even if an error occurs.
Workspaces in the CosmoTech platform provide a way to organize and share files. The CoAL library offers functions for listing, downloading, and uploading files in workspaces.
# Example: Working with workspaces in the CosmoTech APIimportosimportpathlibfromcosmotech.coal.cosmotech_api.connectionimportget_api_clientfromcosmotech.coal.cosmotech_api.workspaceimport(list_workspace_files,download_workspace_file,upload_workspace_file,)fromcosmotech.coal.utils.loggerimportLOGGER# Set up environment variables for authenticationos.environ["CSM_API_URL"]="https://api.cosmotech.com"# Replace with your API URLos.environ["CSM_API_KEY"]="your-api-key"# Replace with your actual API key# Organization and workspace IDsorganization_id="your-organization-id"# Replace with your organization IDworkspace_id="your-workspace-id"# Replace with your workspace ID# Get the API clientapi_client,connection_type=get_api_client()LOGGER.info(f"Connected using: {connection_type}")try:# Example 1: List files in a workspace with a specific prefixfile_prefix="data/"# List files in the "data" directorytry:files=list_workspace_files(api_client,organization_id,workspace_id,file_prefix)print(f"Files in workspace with prefix '{file_prefix}':")forfileinfiles:print(f" - {file}")exceptValueErrorase:print(f"Error listing files: {e}")# Example 2: Download a file from the workspacefile_to_download="data/sample.csv"# Replace with an actual file in your workspacetarget_directory=pathlib.Path("./downloaded_files")target_directory.mkdir(exist_ok=True,parents=True)try:downloaded_file=download_workspace_file(api_client,organization_id,workspace_id,file_to_download,target_directory)print(f"Downloaded file to: {downloaded_file}")exceptExceptionase:print(f"Error downloading file: {e}")# Example 3: Upload a file to the workspacefile_to_upload="./local_data/upload_sample.csv"# Replace with a local file pathworkspace_destination="data/uploaded/"# Destination in the workspace (ending with / to keep filename)try:uploaded_file=upload_workspace_file(api_client,organization_id,workspace_id,file_to_upload,workspace_destination,overwrite=True,# Set to False to prevent overwriting existing files)print(f"Uploaded file as: {uploaded_file}")exceptExceptionase:print(f"Error uploading file: {e}")finally:# Always close the API client when doneapi_client.close()
The workspace_destination parameter can be:
- A specific file path in the workspace
- A directory path ending with /, in which case the original filename is preserved
Workspace Paths
When working with workspace paths:
Use forward slashes (/) regardless of your operating system
The Twin Data Layer (TDL) is a graph database that stores nodes and relationships. CoAL provides tools for working with the TDL, particularly for preparing and sending CSV data.
# Example: Working with the Twin Data Layer in the CosmoTech APIimportosimportpathlibimportcsvfromcosmotech.coal.cosmotech_api.connectionimportget_api_clientfromcosmotech.coal.cosmotech_api.twin_data_layerimportCSVSourceFilefromcosmotech_api.api.twin_graph_apiimportTwinGraphApifromcosmotech.coal.utils.loggerimportLOGGER# Set up environment variables for authenticationos.environ["CSM_API_URL"]="https://api.cosmotech.com"# Replace with your API URLos.environ["CSM_API_KEY"]="your-api-key"# Replace with your actual API key# Organization and workspace IDsorganization_id="your-organization-id"# Replace with your organization IDworkspace_id="your-workspace-id"# Replace with your workspace IDtwin_graph_id="your-twin-graph-id"# Replace with your twin graph ID# Get the API clientapi_client,connection_type=get_api_client()LOGGER.info(f"Connected using: {connection_type}")try:# Create a TwinGraphApi instancetwin_graph_api=TwinGraphApi(api_client)# Example 1: Create sample CSV files for nodes and relationships# Create a directory for our sample datadata_dir=pathlib.Path("./tdl_sample_data")data_dir.mkdir(exist_ok=True,parents=True)# Create a sample nodes CSV file (Person nodes)persons_file=data_dir/"Person.csv"withopen(persons_file,"w",newline="")asf:writer=csv.writer(f)writer.writerow(["id","name","age","city"])writer.writerow(["p1","Alice","30","New York"])writer.writerow(["p2","Bob","25","San Francisco"])writer.writerow(["p3","Charlie","35","Chicago"])# Create a sample relationships CSV file (KNOWS relationships)knows_file=data_dir/"KNOWS.csv"withopen(knows_file,"w",newline="")asf:writer=csv.writer(f)writer.writerow(["src","dest","since"])writer.writerow(["p1","p2","2020"])writer.writerow(["p2","p3","2021"])writer.writerow(["p3","p1","2019"])print(f"Created sample CSV files in {data_dir}")# Example 2: Parse CSV files and generate Cypher queries# Parse the nodes CSV fileperson_csv=CSVSourceFile(persons_file)print(f"Parsed {person_csv.object_type} CSV file:")print(f" Is node: {person_csv.is_node}")print(f" Fields: {person_csv.fields}")print(f" ID column: {person_csv.id_column}")# Generate a Cypher query for creating nodesperson_query=person_csv.generate_query_insert()print(f"\nGenerated Cypher query for {person_csv.object_type}:")print(person_query)# Parse the relationships CSV fileknows_csv=CSVSourceFile(knows_file)print(f"\nParsed {knows_csv.object_type} CSV file:")print(f" Is node: {knows_csv.is_node}")print(f" Fields: {knows_csv.fields}")print(f" Source column: {knows_csv.source_column}")print(f" Target column: {knows_csv.target_column}")# Generate a Cypher query for creating relationshipsknows_query=knows_csv.generate_query_insert()print(f"\nGenerated Cypher query for {knows_csv.object_type}:")print(knows_query)# Example 3: Send data to the Twin Data Layer (commented out as it requires an actual twin graph)""" # For nodes, you would typically: with open(persons_file, "r") as f: reader = csv.DictReader(f) for row in reader: # Create parameters for the Cypher query params = {k: v for k, v in row.items()} # Execute the query twin_graph_api.run_twin_graph_cypher_query( organization_id=organization_id, workspace_id=workspace_id, twin_graph_id=twin_graph_id, twin_graph_cypher_query={ "query": person_query, "parameters": params } ) # For relationships, you would typically: with open(knows_file, "r") as f: reader = csv.DictReader(f) for row in reader: # Create parameters for the Cypher query params = {k: v for k, v in row.items()} # Execute the query twin_graph_api.run_twin_graph_cypher_query( organization_id=organization_id, workspace_id=workspace_id, twin_graph_id=twin_graph_id, twin_graph_cypher_query={ "query": knows_query, "parameters": params } ) """# Example 4: Query data from the Twin Data Layer (commented out as it requires an actual twin graph)""" # Execute a Cypher query to get all Person nodes result = twin_graph_api.run_twin_graph_cypher_query( organization_id=organization_id, workspace_id=workspace_id, twin_graph_id=twin_graph_id, twin_graph_cypher_query={ "query": "MATCH (p:Person) RETURN p.id, p.name, p.age, p.city", "parameters": {} } ) # Process the results print("\nPerson nodes in the Twin Data Layer:") for record in result.records: print(f" - {record}") """finally:# Always close the API client when doneapi_client.close()
When creating relationships, make sure the nodes referenced by the src and dest columns already exist in the graph. Otherwise, the relationship creation will fail.
Runners and runs are central concepts in the CosmoTech platform. CoAL provides functions for working with runner data, parameters, and associated datasets.
# Example: Working with runners and runs in the CosmoTech APIimportosimportpathlibfromcosmotech.coal.cosmotech_api.connectionimportget_api_clientfromcosmotech.coal.cosmotech_api.runnerimport(get_runner_data,get_runner_parameters,download_runner_data,download_datasets,)fromcosmotech.coal.utils.loggerimportLOGGER# Set up environment variables for authenticationos.environ["CSM_API_URL"]="https://api.cosmotech.com"# Replace with your API URLos.environ["CSM_API_KEY"]="your-api-key"# Replace with your actual API key# Organization, workspace, and runner IDsorganization_id="your-organization-id"# Replace with your organization IDworkspace_id="your-workspace-id"# Replace with your workspace IDrunner_id="your-runner-id"# Replace with your runner ID# Get the API clientapi_client,connection_type=get_api_client()LOGGER.info(f"Connected using: {connection_type}")try:# Example 1: Get runner datarunner_data=get_runner_data(organization_id,workspace_id,runner_id)print(f"Runner name: {runner_data.name}")print(f"Runner ID: {runner_data.id}")print(f"Runner state: {runner_data.state}")# Example 2: Get runner parametersparameters=get_runner_parameters(runner_data)print("\nRunner parameters:")forparaminparameters:print(f" - {param['parameterId']}: {param['value']} (type: {param['varType']})")# Example 3: Download runner data (parameters and datasets)# Create directories for parameters and datasetsparam_dir=pathlib.Path("./runner_parameters")dataset_dir=pathlib.Path("./runner_datasets")param_dir.mkdir(exist_ok=True,parents=True)dataset_dir.mkdir(exist_ok=True,parents=True)# Download runner dataresult=download_runner_data(organization_id=organization_id,workspace_id=workspace_id,runner_id=runner_id,parameter_folder=str(param_dir),dataset_folder=str(dataset_dir),read_files=True,# Read file contentsparallel=True,# Download datasets in parallelwrite_json=True,# Write parameters as JSONwrite_csv=True,# Write parameters as CSVfetch_dataset=True,# Fetch datasets)print("\nDownloaded runner data:")print(f" - Parameters saved to: {param_dir}")print(f" - Datasets saved to: {dataset_dir}")# Example 4: Working with specific datasetsifresult["datasets"]:print("\nDatasets associated with the runner:")fordataset_id,dataset_infoinresult["datasets"].items():print(f" - Dataset ID: {dataset_id}")print(f" Name: {dataset_info.get('name','N/A')}")# List files in the datasetif"files"indataset_info:print(f" Files:")forfile_infoindataset_info["files"]:print(f" - {file_info.get('name','N/A')}")else:print("\nNo datasets associated with this runner.")# Example 5: Download specific datasets""" from cosmotech.coal.cosmotech_api.runner import get_dataset_ids_from_runner # Get dataset IDs from the runner dataset_ids = get_dataset_ids_from_runner(runner_data) if dataset_ids: # Create a directory for the datasets specific_dataset_dir = pathlib.Path("./specific_datasets") specific_dataset_dir.mkdir(exist_ok=True, parents=True) # Download the datasets datasets = download_datasets( organization_id=organization_id, workspace_id=workspace_id, dataset_ids=dataset_ids, read_files=True, parallel=True, ) print("\nDownloaded specific datasets:") for dataset_id, dataset_info in datasets.items(): print(f" - Dataset ID: {dataset_id}") print(f" Name: {dataset_info.get('name', 'N/A')}") """finally:# Always close the API client when doneapi_client.close()
This function:
- Downloads parameters and writes them as JSON and/or CSV files
- Downloads associated datasets
- Organizes everything in the specified directories
Dataset References
Runners can reference datasets in two ways:
Through parameters with the %DATASETID% variable type
Through the dataset_list property
The download_runner_data function handles both types of references.
# Example: Complete workflow using the CosmoTech APIimportosimportpathlibimportjsonimportcsvfromcosmotech.coal.cosmotech_api.connectionimportget_api_clientfromcosmotech.coal.cosmotech_api.runnerimport(get_runner_data,download_runner_data,)fromcosmotech.coal.cosmotech_api.workspaceimport(list_workspace_files,download_workspace_file,upload_workspace_file,)fromcosmotech.coal.cosmotech_api.twin_data_layerimportCSVSourceFilefromcosmotech_api.api.twin_graph_apiimportTwinGraphApifromcosmotech_api.api.dataset_apiimportDatasetApifromcosmotech.coal.utils.loggerimportLOGGER# Set up environment variables for authenticationos.environ["CSM_API_URL"]="https://api.cosmotech.com"# Replace with your API URLos.environ["CSM_API_KEY"]="your-api-key"# Replace with your actual API key# Organization, workspace, and runner IDsorganization_id="your-organization-id"# Replace with your organization IDworkspace_id="your-workspace-id"# Replace with your workspace IDrunner_id="your-runner-id"# Replace with your runner IDtwin_graph_id="your-twin-graph-id"# Replace with your twin graph ID# Create directories for our workflowworkflow_dir=pathlib.Path("./workflow_example")workflow_dir.mkdir(exist_ok=True,parents=True)input_dir=workflow_dir/"input"processed_dir=workflow_dir/"processed"output_dir=workflow_dir/"output"input_dir.mkdir(exist_ok=True,parents=True)processed_dir.mkdir(exist_ok=True,parents=True)output_dir.mkdir(exist_ok=True,parents=True)# Get the API clientapi_client,connection_type=get_api_client()LOGGER.info(f"Connected using: {connection_type}")try:# Step 1: Download runner data (parameters and datasets)print("\n=== Step 1: Download Runner Data ===")runner_data=get_runner_data(organization_id,workspace_id,runner_id)print(f"Runner name: {runner_data.name}")result=download_runner_data(organization_id=organization_id,workspace_id=workspace_id,runner_id=runner_id,parameter_folder=str(input_dir/"parameters"),dataset_folder=str(input_dir/"datasets"),write_json=True,write_csv=True,)print(f"Downloaded runner data to {input_dir}")# Step 2: Process the dataprint("\n=== Step 2: Process Data ===")# For this example, we'll create a simple transformation:# - Read a CSV file from the input# - Transform it# - Write the result to the processed directory# Let's assume we have a "customers.csv" file in the input directorycustomers_file=input_dir/"datasets"/"customers.csv"# If the file doesn't exist for this example, create a sample oneifnotcustomers_file.exists():print("Creating sample customers.csv file for demonstration")customers_file.parent.mkdir(exist_ok=True,parents=True)withopen(customers_file,"w",newline="")asf:writer=csv.writer(f)writer.writerow(["id","name","age","city","spending"])writer.writerow(["c1","Alice","30","New York","1500"])writer.writerow(["c2","Bob","25","San Francisco","2000"])writer.writerow(["c3","Charlie","35","Chicago","1200"])# Read the customers datacustomers=[]withopen(customers_file,"r")asf:reader=csv.DictReader(f)forrowinreader:customers.append(row)print(f"Read {len(customers)} customers from {customers_file}")# Process the data: calculate a loyalty score based on age and spendingforcustomerincustomers:age=int(customer["age"])spending=int(customer["spending"])# Simple formula: loyalty score = spending / 100 + (age - 20) / 10loyalty_score=round(spending/100+(age-20)/10,1)customer["loyalty_score"]=str(loyalty_score)# Write the processed dataprocessed_file=processed_dir/"customers_with_loyalty.csv"withopen(processed_file,"w",newline="")asf:fieldnames=["id","name","age","city","spending","loyalty_score"]writer=csv.DictWriter(f,fieldnames=fieldnames)writer.writeheader()writer.writerows(customers)print(f"Processed data written to {processed_file}")# Step 3: Upload the processed file to the workspaceprint("\n=== Step 3: Upload Processed Data to Workspace ===")try:uploaded_file=upload_workspace_file(api_client,organization_id,workspace_id,str(processed_file),"processed_data/",# Destination in the workspaceoverwrite=True,)print(f"Uploaded processed file as: {uploaded_file}")exceptExceptionase:print(f"Error uploading file: {e}")# Step 4: Create a dataset from the processed dataprint("\n=== Step 4: Create Dataset from Processed Data ===")# This step would typically involve:# 1. Creating a dataset in the CosmoTech API# 2. Uploading files to the dataset""" # Create a dataset dataset_api = DatasetApi(api_client) new_dataset = { "name": "Customers with Loyalty Scores", "description": "Processed customer data with calculated loyalty scores", "tags": ["processed", "customers", "loyalty"] } try: dataset = dataset_api.create_dataset( organization_id=organization_id, workspace_id=workspace_id, dataset=new_dataset ) dataset_id = dataset.id print(f"Created dataset with ID: {dataset_id}") # Upload the processed file to the dataset # This would typically involve additional API calls # ... except Exception as e: print(f"Error creating dataset: {e}") """# Step 5: Send data to the Twin Data Layerprint("\n=== Step 5: Send Data to Twin Data Layer ===")# Parse the processed CSV file for the Twin Data Layercustomer_csv=CSVSourceFile(processed_file)# Generate a Cypher query for creating nodescustomer_query=customer_csv.generate_query_insert()print(f"Generated Cypher query for Customer nodes:")print(customer_query)# In a real scenario, you would send this data to the Twin Data Layer""" twin_graph_api = TwinGraphApi(api_client) # For each customer, create a node in the Twin Data Layer with open(processed_file, "r") as f: reader = csv.DictReader(f) for row in reader: # Create parameters for the Cypher query params = {k: v for k, v in row.items()} # Execute the query twin_graph_api.run_twin_graph_cypher_query( organization_id=organization_id, workspace_id=workspace_id, twin_graph_id=twin_graph_id, twin_graph_cypher_query={ "query": customer_query, "parameters": params } ) """# Step 6: Generate a reportprint("\n=== Step 6: Generate Report ===")# Calculate some statisticstotal_customers=len(customers)avg_age=sum(int(c["age"])forcincustomers)/total_customersavg_spending=sum(int(c["spending"])forcincustomers)/total_customersavg_loyalty=sum(float(c["loyalty_score"])forcincustomers)/total_customers# Create a reportreport={"report_date":"2025-02-28","runner_id":runner_id,"statistics":{"total_customers":total_customers,"average_age":round(avg_age,1),"average_spending":round(avg_spending,2),"average_loyalty_score":round(avg_loyalty,1),},"top_customers":sorted(customers,key=lambdac:float(c["loyalty_score"]),reverse=True)[:2],# Top 2 customers by loyalty score}# Write the report to a JSON filereport_file=output_dir/"customer_report.json"withopen(report_file,"w")asf:json.dump(report,f,indent=2)print(f"Report generated and saved to {report_file}")# Print a summary of the reportprint("\nReport Summary:")print(f"Total Customers: {report['statistics']['total_customers']}")print(f"Average Age: {report['statistics']['average_age']}")print(f"Average Spending: {report['statistics']['average_spending']}")print(f"Average Loyalty Score: {report['statistics']['average_loyalty_score']}")print("\nTop Customers by Loyalty Score:")fori,customerinenumerate(report["top_customers"],1):print(f"{i}. {customer['name']} (Score: {customer['loyalty_score']})")print("\nWorkflow completed successfully!")finally:# Always close the API client when doneapi_client.close()
This workflow:
Downloads runner data (parameters and datasets)
Processes the data (calculates loyalty scores for customers)
try:# API operationsexceptcosmotech_api.exceptions.ApiExceptionase:# Handle API errorsprint(f"API error: {e.status} - {e.reason}")exceptExceptionase:# Handle other errorsprint(f"Error: {e}")finally:# Always close the clientapi_client.close()
The CosmoTech API integration in CoAL provides a powerful way to interact with the CosmoTech platform programmatically. By leveraging these capabilities, you can:
Automate workflows
Integrate with other systems
Build custom applications
Process and analyze data
Create end-to-end solutions
Whether you're building data pipelines, creating custom interfaces, or integrating with existing systems, the CoAL library's API integration offers the tools you need to work effectively with the CosmoTech platform.