Vertex AI にモデルをデプロイして予測を取得する

Vertex AI 上の Ray クラスタでモデルをトレーニングした後、次の手順でオンライン予測リクエスト用のモデルをデプロイできます。

始める前に、Ray on Vertex AI の概要を読み、前提条件となるすべてのツールを設定してください。

このセクションの手順では、インタラクティブな Python 環境で Ray on Vertex AI SDK を使用していることを前提としています。

Ray on Vertex AI クライアントをインポートして初期化する

Vertex AI 上の Ray クラスタにすでに接続している場合は、カーネルを再起動して次のコードを実行します。runtime_env 変数は、接続時にオンライン予測コマンドを実行するために必要です。

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Vertex AI to retrieve projects for downstream operations.
aiplatform.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

ここで

  • CLUSTER_RESOURCE_NAME: プロジェクト全体で一意である必要がある、Vertex AI クラスタ上の Ray の完全なリソース名。

  • BUCKET_URI は、モデル アーティファクトを保存する Cloud Storage バケットです。

モデルをトレーニングして Vertex AI Model Registry にエクスポートする

Ray チェックポイントから Vertex AI モデルをエクスポートし、モデルを Vertex AI Model Registry にアップロードします。

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray.tune.syncer import SyncConfig
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.4.0", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      model.save("temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      session.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          upload_dir=f'{BUCKET_URI}/ray_results/tensorflow',
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  SklearnCheckpoint.from_checkpoint(result.checkpoint)
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostCheckpoint

vertex_model = xgboost.register_xgboost(
  XGBoostCheckpoint.from_checkpoint(result.checkpoint)
)

モデルをデプロイする

オンライン エンドポイントにモデルをデプロイします。詳細については、エンドポイントにモデルをデプロイするをご覧ください。

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

ここで

  • (省略可)DEPLOYED_NAME: デプロイされたモデルの表示名。作成時に指定されていない場合は、モデルの display_name が使用されます。

  • (省略可)TRAFFIC_SPLIT: デプロイ済みモデルの ID から、そのデプロイ済みモデルに転送する必要があるこのエンドポイントのトラフィックの割合へのマップ。デプロイされたモデルの ID がこのマップに一覧表示されていない場合、トラフィックは受信されていません。トラフィックの割合の値は合計で 100 となる必要があります。また、エンドポイントがその時点で一切のトラフィックを受け付けない場合はマップを空白にする必要があります。デプロイされるモデルのキーは "0" です。例: {"0": 100}

  • (省略可)MACHINE_TYPE: コンピューティング リソースを指定します

予測リクエストを行う

予測リクエストをエンドポイントに送信します。詳細については、カスタム トレーニング済みモデルからオンライン予測を取得するをご覧ください。

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

出力は次のようになります。

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

次のステップ