在 Vertex AI 上运行 Ray 应用时,您可以将 BigQuery 用作云数据库。本部分介绍如何从 Vertex AI 上的 Ray 集群对 BigQuery 数据库进行读取和写入操作。本部分中的步骤假定您使用的是 Python 版 Vertex AI SDK。
如果要从 BigQuery 数据集读取数据,您应该新建 BigQuery 数据集或使用现有数据集。
导入并初始化 Ray on Vertex AI 客户端
如果您已连接到 Vertex AI 上的 Ray 集群,请重启内核并运行以下代码。连接时必须采用 runtime_env
变量才能运行 BigQuery 命令。
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.9.3"] } ray.init(address=address, runtime_env=runtime_env)
从 BigQuery 中读取数据
从 BigQuery 数据集读取数据。读取必须在 Ray 任务中进行。
aiplatform.init(project=project_id, location=location) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
其中:
PROJECT_ID:Google Cloud 项目 ID。 您可以在 Google Cloud 控制台欢迎页面中找到项目 ID。
DATASET:BigQuery 数据集。必须采用
dataset.table
格式。如果提供查询,则设置为None
。NUM_BLOCKS:一个整数,将影响并行创建的读取任务数。创建的读取流数可能少于您请求的数量。
SQL_QUERY:包含要从 BigQuery 数据库读取的 SQL 查询的字符串。如果不需要查询,则设置为
None
。
转换数据
使用 pyarrow
或 pandas
更新和删除 BigQuery 表中的行和列。如果您想使用 pandas
转换,建议将输入类型保留为 pyrow 类型,并转换为用户定义的函数 (UDF) 中的 pandas
类型,以便您可以捕获 UDF 中的任何 pandas
转换类型错误。转换必须在 Ray 任务中进行。
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
将数据写入 BigQuery
将数据插入 BigQuery 数据集。 写入必须在 Ray 任务中进行。
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
其中:
- DATASET:BigQuery 数据集。必须采用
dataset.table
格式。