Déployer un modèle sur Vertex AI et obtenir des prédictions

Après avoir entraîné un modèle sur un cluster Ray sur Vertex AI, vous pouvez déployer le modèle pour les requêtes de prédiction en ligne en procédant comme suit :

Avant de commencer, assurez-vous de lire la présentation de Ray sur Vertex AI et de configurer tous les outils prérequis.

Les étapes de cette section supposent que vous utilisez le SDK Ray sur Vertex AI dans un environnement Python interactif.

Importer et initialiser le client Ray sur Vertex AI

Si vous êtes déjà connecté à votre cluster Ray sur Vertex AI, redémarrez votre kernel et exécutez le code suivant. La variable runtime_env est nécessaire au moment de la connexion pour exécuter des commandes de prédiction en ligne.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Vertex AI to retrieve projects for downstream operations.
aiplatform.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

Où :

  • CLUSTER_RESOURCE_NAME : nom complet de la ressource Ray sur Vertex AI, qui doit être unique dans votre projet.

  • BUCKET_URI : bucket Cloud Storage pour stocker les artefacts de modèle.

Entraîner et exporter le modèle vers Vertex AI Model Registry

Exportez le modèle Vertex AI à partir du point de contrôle Ray et importez-le dans Vertex AI Model Registry.

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.9.3", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      model.save("temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints.
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • Convertissez les points de contrôle Ray en modèle.

  • Créez model.mar.

  • Créez un LocalModel à l'aide de model.mar.

  • Importer dans Vertex AI Model Registry

Déployer le modèle pour les prédictions en ligne

Déployez le modèle sur le point de terminaison. Pour en savoir plus, consultez Déployer un modèle sur un point de terminaison.

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

Où :

  • (Facultatif) DEPLOYED_NAME : nom à afficher du modèle déployé. S'il n'est pas fourni lors de la création, la valeur display_name du modèle est utilisée.

  • (Facultatif) TRAFFIC_SPLIT : mappage de l'ID d'un modèle déployé au pourcentage du trafic de ce point de terminaison à transférer vers ce modèle déployé. Si l'ID d'un modèle déployé ne figure pas dans ce mappage, il ne reçoit aucun trafic. La somme des valeurs de pourcentage du trafic doit être égale à 100, ou le mappage doit être vide si le point de terminaison n'accepte aucun trafic pour le moment. La clé du modèle déployé est "0". Exemple : {"0": 100}.

  • (Facultatif) MACHINE_TYPE : spécifiez les ressources de calcul.

Envoyer une requête de prédiction

Envoyez une requête de prédiction au point de terminaison. Pour en savoir plus, consultez la section Obtenir des prédictions en ligne à partir d'un modèle entraîné personnalisé.

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

Vous devriez obtenir un résultat semblable à celui-ci :

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

Étapes suivantes