Implementa un modelo en Vertex AI y obtén predicciones

Después de entrenar un modelo en un clúster de Ray en Vertex AI, puedes implementar el modelo para las solicitudes de predicción en línea mediante el siguiente proceso:

Antes de comenzar, asegúrate de leer la descripción general de Ray en Vertex AI y configurar todas las herramientas de requisitos que necesitas.

En los pasos de esta sección, se supone que estás usando el SDK de Ray en Vertex AI en un entorno interactivo de Python.

Importa e inicializa el cliente de Ray en Vertex AI

Si ya estás conectado a tu Ray en Vertex AI clúster, reinicia tu kernel y ejecuta el siguiente código. La variable runtime_env es necesaria en el momento de la conexión para ejecutar comandos de predicción en línea.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Vertex AI to retrieve projects for downstream operations.
aiplatform.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

Aquí:

  • CLUSTER_RESOURCE_NAME: Es el nombre completo del recurso para el clúster de Ray en Vertex AI que debe ser único en todo el proyecto.

  • BUCKET_URI es el bucket de Cloud Storage para almacenar los artefactos del modelo.

Entrena y exporta el modelo a Vertex AI Model Registry

Exporta el modelo de Vertex AI desde el punto de control de Ray y sube el modelo a Vertex AI Model Registry.

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.9.3", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      model.save("temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints.
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • Convierte los puntos de control de Ray en un modelo.

  • Compila model.mar.

  • Crea LocalModel mediante model.mar.

  • Sube al Model Registry de Vertex AI.

Implementa el modelo para predicciones en línea

Implementa el modelo en el extremo en línea: Para obtener más información, consulta Implementa el modelo en un extremo.

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

Aquí:

  • DEPLOYED_NAME: El nombre visible del modelo implementado (opcional). Si no se proporciona en la creación, se usa el display_name del modelo.

  • TRAFFIC_SPLIT: Un mapa del ID de un modelo implementado al porcentaje del tráfico de este extremo que se debe reenviar a ese modelo implementado (opcional). Si el ID de un modelo implementado no aparece en este mapa, no recibe tráfico. Los valores del porcentaje de tráfico deben sumar hasta 100, o el mapa debe estar vacío si el extremo no acepta tráfico en este momento. La clave para el modelo que se implementa es "0". Por ejemplo, {"0": 100}.

  • MACHINE_TYPE: Especifica los recursos de procesamiento (opcional).

Realiza una solicitud de predicción

Envía una solicitud de predicción al extremo. Para obtener más información, consulta Obtén predicciones en línea a partir de un modelo entrenado personalizado.

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

Deberías obtener un resultado como el siguiente:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

¿Qué sigue?