Esegui il deployment di un modello su Vertex AI e ottieni previsioni

Dopo aver addestrato un modello su un cluster Ray su Vertex AI, puoi eseguire il deployment del modello per le richieste di previsione online utilizzando il seguente processo:

Prima di iniziare, assicurati di leggere la panoramica di Ray on Vertex AI e di configurare tutti gli strumenti prerequisiti di cui hai bisogno.

I passaggi di questa sezione presuppongono che utilizzi l'SDK Ray on Vertex AI in un ambiente Python interattivo.

Importa e inizializza Ray sul client Vertex AI

Se hai già stabilito la connessione al tuo cluster Ray su Vertex AI, riavvia il kernel ed esegui il codice seguente. La variabile runtime_env è necessaria al momento della connessione per eseguire i comandi di previsione online.

import ray
import vertexai

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Vertex AI to retrieve projects for downstream operations.
vertexai.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

Dove:

  • CLUSTER_RESOURCE_NAME: il nome completo della risorsa per il cluster Ray on Vertex AI, che deve essere univoco nel progetto.

  • BUCKET_URI è il bucket Cloud Storage in cui archiviare gli artefatti del modello.

Addestra ed esporta il modello in Vertex AI Model Registry

Esporta il modello Vertex AI dal checkpoint Ray e caricalo in Vertex AI Model Registry.

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.9.3", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      tf.saved_model.save(model, "temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints. 
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • Converti i checkpoint Ray in un modello.

  • Crea model.mar.

  • Crea un LocalModel utilizzando model.mar.

  • Carica su Vertex AI Model Registry.

Esegui il deployment del modello per le previsioni online

Eseguire il deployment del modello nell'endpoint online. Per maggiori informazioni, consulta Eseguire il deployment del modello su un endpoint.

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

Dove:

  • (Facoltativo) DEPLOYED_NAME: il nome visualizzato del modello di cui è stato eseguito il deployment. Se non viene fornito al momento della creazione, viene utilizzato il valore display_name del modello.

  • (Facoltativo) TRAFFIC_SPLIT: una mappa dall'ID di un modello di cui è stato eseguito il deployment alla percentuale di traffico di questo endpoint che dovrebbe essere inoltrata al modello di cui è stato eseguito il deployment. Se l'ID di un modello di cui è stato eseguito il deployment non è elencato in questa mappa, non riceve traffico. La somma dei valori delle percentuali di traffico deve essere pari a 100 oppure la mappa deve essere vuota se l'endpoint non accetta traffico in quel momento. La chiave per il modello di cui viene eseguito il deployment è "0". Ad esempio: {"0": 100}.

  • (Facoltativo) MACHINE_TYPE: specifica le risorse di computing.

invia una richiesta di previsione

Invia una richiesta di previsione all'endpoint. Per ulteriori informazioni, consulta l'articolo Ottenere previsioni online da un modello addestrato personalizzato.

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

Dovrebbe essere restituito un output simile al seguente:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

Passaggi successivi