在 Vertex AI 上的 Ray 集群中训练模型后,您可以使用以下流程为在线预测请求部署模型:
从 Ray 检查点导出模型。
将模型上传到 Vertex AI Model Registry。
将模型部署到端点。
发出预测请求。
本部分中的步骤假定您在交互式 Python 环境中使用 Ray on Vertex AI SDK。
导入并初始化 Ray on Vertex AI 客户端
如果您已连接到 Vertex AI 上的 Ray 集群,请重启内核并运行以下代码。连接时必须采用 runtime_env
变量才能运行在线预测命令。
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. aiplatform.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
其中:
CLUSTER_RESOURCE_NAME:Ray on Vertex AI 集群的完整资源名称,该名称在整个项目中必须是唯一的。
BUCKET_URI 是用于存储模型工件的 Cloud Storage 存储桶。
训练模型并将其导出到 Vertex AI Model Registry
从 Ray 检查点导出 Vertex AI 模型并将模型上传到 Vertex AI Model Registry。
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.9.3", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) model.save("temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
将 Ray 检查点转换为模型。
构建
model.mar
。使用
model.mar
创建 LocalModel。上传到 Vertex AI Model Registry。
部署模型
将模型部署到在线端点。如需了解详情,请参阅将模型部署到端点。
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
其中:
(可选)DEPLOYED_NAME:所部署模型的显示名称。如果在创建时未提供,则系统会使用模型的
display_name
。(可选)TRAFFIC_SPLIT:从所部署模型的 ID 到此端点流量中应转发到所部署模型的百分比的映射。如果所部署模型的 ID 未在此映射中列出,则它不会收到流量。流量百分比值的总和必须等于 100,或者如果端点当前不接受任何流量,则映射必须为空。所部署的模型的键是
"0"
。例如{"0": 100}
。(可选)MACHINE_TYPE:指定计算资源。
发出预测请求
向端点发送预测请求。如需了解详情,请参阅从自定义训练模型获取在线预测结果。
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
您应该会得到如下所示的输出:
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)