Implantar um modelo na Vertex AI e receber previsões

Depois de treinar um modelo em um cluster do Ray na Vertex AI, é possível implantar o modelo para solicitações de previsão on-line usando o seguinte processo:

Before you begin, make sure to read the Ray on Vertex AI overview and set up all the prerequisite tools you need.

As etapas desta seção pressupõem que você esteja usando o SDK Ray na Vertex AI em um ambiente Python interativo.

Importar e inicializar o Ray no cliente da Vertex AI

Se você já estiver conectado ao cluster do Ray na Vertex AI, reinicie o kernel e execute o código a seguir. A variável runtime_env é necessária no momento da conexão, para executar comandos de previsão on-line.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Vertex AI to retrieve projects for downstream operations.
aiplatform.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

Em que:

  • CLUSTER_RESOURCE_NAME: o nome completo do recurso do cluster Ray na Vertex AI que precisa ser exclusivo em todo o projeto.

  • BUCKET_URI é o bucket do Cloud Storage para armazenar os artefatos do modelo.

Treinar e exportar o modelo para o Vertex AI Model Registry

Exportar o modelo da Vertex AI do checkpoint do Ray e fazer o upload dele para o Vertex AI Model Registry.

TensorFlow.

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.9.3", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      model.save("temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints.
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • Converta os checkpoints do Ray em um modelo.

  • Crie o model.mar.

  • Crie o LocalModel usando model.mar.

  • Faça upload para o Vertex AI Model Registry.

Implantar o modelo para previsões on-line

Implante o modelo no endpoint on-line. Para mais informações, consulte Implantar o modelo em um endpoint.

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

Em que:

  • (Opcional) DEPLOYED_NAME: o nome de exibição do modelo implantado. Se não for fornecido na criação, o display_name do modelo será usado.

  • (Opcional) TRAFFIC_SPLIT: um mapa do ID de um modelo implantado para a porcentagem do tráfego desse endpoint que precisa ser encaminhado para esse modelo implantado. Se o ID de um modelo implantado não estiver listado nesse mapa, ele não receberá tráfego. Os valores de porcentagem de tráfego precisam totalizar 100 ou o mapa precisa estar vazio se o endpoint não aceitar tráfego no momento. A chave do modelo que está sendo implantado é "0". Por exemplo, {"0": 100}.

  • (Opcional) MACHINE_TYPE: especifique os recursos de computação.

Fazer uma solicitação de previsão

Envie uma solicitação de previsão para o endpoint. Para mais informações, consulte Receber previsões on-line de um modelo treinado personalizado.

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

O resultado será semelhante a este:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

A seguir