Working With Data
The decision_os_client.models.data_reference.DataReference
class is a thin wrapper around a polars.LazyFrame which means that the entire polars api is at your disposal without needing to coerce to an in memory DataFrame library to start processing.
Key Capabilities
Serializing a DataReference is quick and portable. This allows it to be passed around between processes and steps in a pipeline without needing to move large amounts of data. The write
method on all connection data sources returns a DataReference so you can write the response to a source and return a reference to it at the same time, e.g. the pipeline step below:
import polars as pl
from decision_os_client import DataReference
from decision_os_client.connections import GCPConnection
# This is an example using a GCP connection to write back to BigQuery
gcp_conn = await GCPConnection.get(name='conn-name')
# Define a simple pipeline step that could be used with Cloud Composer.
def pipeline_step(input_ref: DataReference) -> DataReference:
# Run some basic processing on the data using polars.
processed = input_ref.group_by("a").agg([pl.col("b"), pl.col("c")])
# Use the connection to write the result back to a new table. This function returns a
# reference for the write location that can be passed to the next step in the pipeline
# as required
return gcp_conn.data.write(processed, table='post-pipeline-step')
The Polars library enables us to perform extra optimization under the hood on the example above.
- In this example the processing defined is done in a lazy manner so nothing is pulled from the source until we call
write
to push the processed data to the new location. - Streaming is enabled by default so irrespective of the source dataset size this should never blow up memory during processing.
- Certain parts of queries are pushed down to the source using SQL. In the example above, the selection of columns
a
,b
andc
is pushed down the source automatically. This behavior is currently supported for column selections and row limits where possible. In the future, filtering operations will be pushed down to source as well where possible.
Constructing a DataReference
DataReferences can be constructed directly or they can be created implicitly by requesting data from a remote source.
To create a DataReference from a local file you can use the following methods:
import pandas
from decision_os_client import DataReference
# Create a reference to a local csv
ref = DataReference.from_csv(path='path/to.csv')
# Create a reference to a local parquet file
ref = DataReference.from_parquet(path='path/to.parquet')
# Create a reference to an in memory pandas dataframe
df = pandas.DataFrame({'x': numpy.random.normal(size=100), 'y': numpy.random.normal(size=100)})
ref = DataReference.from_pandas(df)
If your data is in a remote location, e.g. BigQuery, Snowflake or Databricks then you can use the connections api to fetch a reference to a particular table or even an SQL Query spanning multiple tables. The below example uses BigQuery as an example, see the connection setup guides for getting started with other sources.
from decision_os_client.connections import GCPConnection
# Get your GCPConnection, check the connection setup guide if you don't have one
gcp_conn = await GCPConnection.get(name='connection-name')
# Get a reference to a single table (note BigQuery needs the dataset name passed as well)
ref = gcp_conn.data.get(table='dataset_name.table_name')
# Collect the first 10 rows and collect it to an in-memory polars.DataFrame.
print(ref.head(10).collect())
# Alternatively we can get a reference based on an SQL Query
# In this example we join two tables together and then return a couple of columns from each.
ref = gcp_conn.data.get(
query="""
SELECT table1.cola, table1.colb, table2.col1, table2.col2
FROM dataset_name.table1
LEFT JOIN dataset_name.table2
ON dataset_name.table1.cola = dataset_name.table2.cola;
"""
)
# Collect the first 10 rows and collect it to an in-memory polars.DataFrame.
print(ref.head(10).collect())
When a query is passed then the schema needs to be determined for the library to work correctly. This is done by running the query and then returning one row of the result. If the query is very expensive and the application of LIMIT 1
cannot be optimized to run quickly by the engine then it may take some time to initialize the class.
Consuming a DataReference
The built in LazyFrame api leaves a lot of choices up to the user and isn't always consistent with itself (primarily to distinguish between the lazy and non-lazy versions). As DataReferences only support the lazy version of the api they include a set of convenience methods to smooth out the inconsistencies and make certain operations a bit easier.
Its worth noting that none of the methods here overwrite the existing ones of the underlying polars api. Any polars examples will work with a DataReference object as if it were any other polars.LazyFrame
object.
To better support getting from a DataReference to a different format, the following methods have been added to the api:
.to_pandas()
- Collects the LazyFrame to a concrete dataset in memory and convert this to a pandas.DataFrame..to_csv(path='path/to.csv')
- Collects the LazyFrame to a csv file in a streaming manner.to_parquet(path='path/to.parquet')
- Collects the LazyFrame to a parquet file in a streaming manner
Sampling at Source
The sample method enables sampling pushed down to the source. In the below examples, sampling is performed in the underlying source where supported and only the sampled data is returned back for further processing.
from decision_os_client.connections import GCPConnection
from decision_os_client.models.sampling_options import SamplingOptions
# Get your GCPConnection, check the connection setup guide if you don't have one
gcp_conn = await GCPConnection.get(name='connection-name')
# Get a reference to a single table (note BigQuery needs the dataset name passed as well)
ref = gcp_conn.data.get(table='dataset_name.table_name')
# Pull back the first 1000 rows
ref.sample(sampling_options=SamplingOptions(n=1000)).collect()
# Pull back a random 10% sample of the dataset
ref.sample(sampling_options=SamplingOptions(frac=0.1)).collect()
# Some sources also support setting a seed for fractional sampling to ensure it's repeatable. If seed is not supported
# by a source then a warning will be shown
ref.sample(sampling_options=SamplingOptions(frac=0.1, seed=42)).collect()
Writing Back to Source
Once you've performed any transformations on the data then you will likely want to store this result somewhere for access in later pipeline/processing steps rather than recomputing it. To facilitate this Connection objects expose a write method that allows you to write back to source in an easy manner.
import polars as pl
from decision_os_client import DataReference
from decision_os_client.connections import DatabricksConnection
# This is an example using a Databricks connection
db_conn = await DatabricksConnection.get(name='conn-name')
# Get an initial reference
ref = db_conn.data.get(table='dataset_name.table_name')
# Run some basic processing on the data using polars.
processed = ref.group_by("a").agg([pl.col("b"), pl.col("c")])
# Use the connections' data.write method to push the result back to a new table. If the table already exists then this
# will raise an error.
out_ref = db_conn.data.write(processed, table='post-pipeline-step')
# If you'd like to append your data to an existing table then add the append flag
out_ref = db_conn.data.write(processed, table='post-pipeline-step', append=True)
# If you'd like to overwrite the data in an existing table then add the overwrite flag
out_ref = db_conn.data.write(processed, table='post-pipeline-step', overwrite=True)
In each of the examples above the write
method returns a new DataReference object to the written location. This allows you to easily write to a source and create a reference that can be easily passed to other processes in a single call.
Understanding the Structure of the Source
The connection.data.get
method is specifically intended for fetching tabular data rather than performing system queries like SHOW TABLES
. These will fail due to some extra manipulation of the passed query to enable the push down of certain operations from the returned DataReference object. To allow you to inspect basic information about the source the following methods are exposed on every connection class to cover the basic operations.
gcp_conn = await GCPConnection.get(name='connection-name')
# Get a list of all available schemas
gcp_conn.data.get_all_schemas()
# Get a list of all available tables under a specific schema
gcp_conn.data.get_all_tables(schema='<schema_name>')
# Get a list of all available columns under a specific table
gcp_conn.data.get_all_columns(schema='<schema_name>', table='<table_name>')
A Note on Serialization
DataReference objects are designed to serialize into small reference objects that can be quickly passed between processes and machines. However, to achieve this there are a couple of areas to be aware of:
- When serializing a DataReference that is a reference to an in memory
pandas.DataFrame
then thepandas.DataFrame
will be serialized into the DataReference as there is no other way to ensure the reference works wherever it gets deserialized. - When serializing a DataReference the credentials for accessing the source are serialized along with it. This is required so the credentials are always present when the serialized object it accessed.
- When using it with the
@remote
decorator, credentials are encrypted during serialization using a unique generated key that is then set as a secret on the running job. - If you are serializing a DataReference outside of the existing system then you will need to ensure the encryption key is properly configured. You can do this using the following utility:
import pickle
from decision_os_client.utilities.secure_pickle import use_secure_pickle
gcp_conn = await GCPConnection.get(name='connection-name')
ref = gcp_conn.data.get(table='dataset_name.table_name')
# Serialize the reference securely
with use_secure_pickle() as key:
# Pickle the DataReference here
serialized = pickle.dumps(ref)
# To deserialize then either set the key in the environment as
os.environ['ENCRYPTION_KEY'] = key
# or use the context manager again but pass in the key
with use_secure_pickle(key):
# Pickle the DataReference here
restored = pickle.loads(serialized)