The datastore is a powerful data management abstraction that provides a unified interface to a SQLite database. It allows you to store, retrieve, transform, and query tabular data in various formats through a consistent API.
The core idea behind the datastore is to provide a robust, flexible system for data management that simplifies working with different data formats while offering persistence and advanced query capabilities.
Key Features
Format flexibility (Python dictionaries, CSV files, Pandas DataFrames, PyArrow Tables)
fromcosmotech.coal.store.storeimportStorefromcosmotech.coal.store.native_pythonimportstore_pylist# We initialize and reset the data storemy_datastore=Store(reset=True)# We create a simple list of dict datamy_data=[{"foo":"bar"},{"foo":"barbar"},{"foo":"world"},{"foo":"bar"}]# We use a bundled method to send the py_list to the storestore_pylist("my_data",my_data)# We can make a sql query over our data# Store.execute_query returns a pyarrow.Table object so we can make use of Table.to_pylist to get an equivalent formatresults=my_datastore.execute_query("SELECT foo, count(*) as line_count FROM my_data GROUP BY foo").to_pylist()# We can print our results nowprint(results)# > [{'foo': 'bar', 'line_count': 2}, {'foo': 'barbar', 'line_count': 1}, {'foo': 'world', 'line_count': 1}]
importpathlibfromcosmotech.coal.store.storeimportStorefromcosmotech.coal.store.csvimportstore_csv_file,convert_store_table_to_csv# Initialize the storestore=Store(reset=True)# Load data from a CSV filecsv_path=pathlib.Path("path/to/your/data.csv")store_csv_file("customers",csv_path)# Query the datahigh_value_customers=store.execute_query(""" SELECT * FROM customers WHERE annual_spend > 10000 ORDER BY annual_spend DESC""")# Export results to a new CSV fileoutput_path=pathlib.Path("path/to/output/high_value_customers.csv")convert_store_table_to_csv("high_value_customers",output_path)
importpandasaspdfromcosmotech.coal.store.storeimportStorefromcosmotech.coal.store.pandasimportstore_dataframe,convert_store_table_to_dataframe# Initialize the storestore=Store(reset=True)# Create a pandas DataFramedf=pd.DataFrame({"product_id":[1,2,3,4,5],"product_name":["Widget A","Widget B","Gadget X","Tool Y","Device Z"],"price":[19.99,29.99,99.99,49.99,199.99],"category":["Widgets","Widgets","Gadgets","Tools","Devices"],})# Store the DataFramestore_dataframe("products",df)# Query the dataexpensive_products=store.execute_query(""" SELECT * FROM products WHERE price > 50 ORDER BY price DESC""")# Convert results back to a pandas DataFrame for further analysisexpensive_df=convert_store_table_to_dataframe("expensive_products",store)# Use pandas methods on the resultprint(expensive_df.describe())
importpyarrowaspafromcosmotech.coal.store.storeimportStorefromcosmotech.coal.store.pyarrowimportstore_table# Initialize the storestore=Store(reset=True)# Create a PyArrow Tabledata={"date":pa.array(["2023-01-01","2023-01-02","2023-01-03"]),"value":pa.array([100,150,200]),"category":pa.array(["A","B","A"]),}table=pa.Table.from_pydict(data)# Store the tablestore_table("time_series",table)# Query and retrieve dataresult=store.execute_query(""" SELECT date, SUM(value) as total_value FROM time_series GROUP BY date""")print(result)
fromcosmotech.coal.store.storeimportStorefromcosmotech.coal.store.native_pythonimportstore_pyliststore=Store(reset=True)# Store customer datacustomers=[{"customer_id":1,"name":"Acme Corp","segment":"Enterprise"},{"customer_id":2,"name":"Small Shop","segment":"SMB"},{"customer_id":3,"name":"Tech Giant","segment":"Enterprise"},]store_pylist("customers",customers,store=store)# Store order dataorders=[{"order_id":101,"customer_id":1,"amount":5000},{"order_id":102,"customer_id":2,"amount":500},{"order_id":103,"customer_id":1,"amount":7500},{"order_id":104,"customer_id":3,"amount":10000},]store_pylist("orders",orders,store=store)# Join tables to analyze orders by customer segmentresults=store.execute_query(""" SELECT c.segment, COUNT(o.order_id) as order_count, SUM(o.amount) as total_revenue FROM customers c JOIN orders o ON c.customer_id = o.customer_id GROUP BY c.segment""").to_pylist()print(results)# > [{'segment': 'Enterprise', 'order_count': 3, 'total_revenue': 22500}, {'segment': 'SMB', 'order_count': 1, 'total_revenue': 500}]
fromcosmotech.coal.store.storeimportStorefromcosmotech.coal.store.native_pythonimportstore_pylist,convert_table_as_pylistimportpathlibfromcosmotech.coal.store.csvimportstore_csv_file,convert_store_table_to_csv# Initialize the storestore=Store(reset=True)# 1. Load raw data from CSVraw_data_path=pathlib.Path("path/to/raw_data.csv")store_csv_file("raw_data",raw_data_path,store=store)# 2. Clean and transform the datastore.execute_query(""" CREATE TABLE cleaned_data AS SELECT id, TRIM(name) as name, UPPER(category) as category, CASE WHEN value < 0 THEN 0 ELSE value END as value FROM raw_data WHERE id IS NOT NULL""")# 3. Aggregate the datastore.execute_query(""" CREATE TABLE summary_data AS SELECT category, COUNT(*) as count, AVG(value) as avg_value, SUM(value) as total_value FROM cleaned_data GROUP BY category""")# 4. Export the resultssummary_data=convert_table_as_pylist("summary_data",store=store)print(summary_data)# 5. Save to CSV for reportingoutput_path=pathlib.Path("path/to/output/summary.csv")convert_store_table_to_csv("summary_data",output_path,store=store)
Step 1: Load data
1 2 3 4 5 6 7 8 910
fromcosmotech.coal.store.storeimportStorefromcosmotech.coal.store.csvimportstore_csv_fileimportpathlib# Initialize the storestore=Store(reset=True)# Load raw data from CSVraw_data_path=pathlib.Path("path/to/raw_data.csv")store_csv_file("raw_data",raw_data_path,store=store)
Step 2: Clean data
1 2 3 4 5 6 7 8 910111213
# Clean and transform the datastore.execute_query(""" CREATE TABLE cleaned_data AS SELECT id, TRIM(name) as name, UPPER(category) as category, CASE WHEN value < 0 THEN 0 ELSE value END as value FROM raw_data WHERE id IS NOT NULL""")
Step 3: Aggregate data
1 2 3 4 5 6 7 8 910111213
# Aggregate the datastore.execute_query(""" CREATE TABLE summary_data AS SELECT category, COUNT(*) as count, AVG(value) as avg_value, SUM(value) as total_value FROM cleaned_data GROUP BY category""")
Step 4: Export results
1 2 3 4 5 6 7 8 91011
fromcosmotech.coal.store.native_pythonimportconvert_table_as_pylistfromcosmotech.coal.store.csvimportconvert_store_table_to_csvimportpathlib# Export to Python listsummary_data=convert_table_as_pylist("summary_data",store=store)print(summary_data)# Save to CSV for reportingoutput_path=pathlib.Path("path/to/output/summary.csv")convert_store_table_to_csv("summary_data",output_path,store=store)
Use reset=True when you want to start with a fresh database
Omit the reset parameter or set it to False when you want to maintain data between runs
Specify a custom location with the store_location parameter if needed
Store initialization options
1 2 3 4 5 6 7 8 91011
# Fresh store each timestore=Store(reset=True)# Persistent store at default locationstore=Store()# Persistent store at custom locationimportpathlibcustom_path=pathlib.Path("/path/to/custom/location")store=Store(store_location=custom_path)
Table management
Use descriptive table names that reflect the data content
Check if tables exist before attempting operations
List available tables to explore the database
Table management
1 2 3 4 5 6 7 8 91011
# Check if a table existsifstore.table_exists("customers"):# Do something with the tablepass# List all tablesfortable_nameinstore.list_tables():print(f"Table: {table_name}")# Get schema informationschema=store.get_table_schema(table_name)print(f"Schema: {schema}")
Performance considerations
For large datasets, consider chunking data when loading
Use SQL to filter data early rather than loading everything into memory
Index frequently queried columns for better performance
Handling large datasets
1 2 3 4 5 6 7 8 910111213141516
# Example of chunking data loadchunk_size=10000foriinrange(0,len(large_dataset),chunk_size):chunk=large_dataset[i:i+chunk_size]store_pylist(f"data_chunk_{i//chunk_size}",chunk,store=store)# Combine chunks with SQLstore.execute_query(""" CREATE TABLE combined_data AS SELECT * FROM data_chunk_0 UNION ALL SELECT * FROM data_chunk_1 -- Add more chunks as needed""")
The datastore provides a powerful, flexible foundation for data management in your CosmoTech applications. By leveraging its capabilities, you can:
Simplify data handling across different formats
Build robust data processing pipelines
Perform complex data transformations and analyses
Maintain data persistence between application runs
Integrate seamlessly with other CosmoTech components
Whether you're working with small datasets or large-scale data processing tasks, the datastore offers the tools you need to manage your data effectively.