Membangun pipeline untuk pelatihan model berkelanjutan

Dokumen ini akan menuntun Anda menjalankan langkah-langkah yang diperlukan untuk membangun pipeline yang otomatis melatih model kustom setiap kali data baru disisipkan ke dalam set data menggunakan Vertex AI Pipelines dan Cloud Functions.

Tujuan

Langkah-langkah berikut mencakup proses ini:

  1. Mendapatkan dan menyiapkan set data di BigQuery.

  2. Membuat dan mengupload paket pelatihan kustom. Ketika dieksekusi, model ini akan membaca data dari set data dan melatih model.

  3. Membangun Pipeline Vertex AI. Pipeline ini mengeksekusi paket pelatihan kustom, mengupload model ke Vertex AI Model Registry, menjalankan tugas evaluasi, dan mengirim notifikasi email.

  4. Menjalankan pipeline secara manual.

  5. Buat Cloud Function dengan pemicu Eventarc yang menjalankan pipeline setiap kali data baru dimasukkan ke dalam set data BigQuery.

Sebelum Memulai

Siapkan project dan notebook Anda.

Penyiapan project

  1. Di konsol Google Cloud, buka halaman Pemilih project.

    Buka pemilih project

  2. Pilih atau buat project Google Cloud.

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

Buat notebook

Kita menggunakan notebook Colab Enterprise untuk mengeksekusi beberapa kode dalam tutorial ini.

  1. Jika Anda bukan pemilik project, minta pemilik project untuk memberi Anda peran IAM roles/resourcemanager.projectIamAdmin dan roles/aiplatform.colabEnterpriseUser.

    Anda harus memiliki peran ini agar dapat menggunakan Colab Enterprise dan memberikan peran serta izin IAM ke Anda sendiri dan akun layanan.

    Buka IAM

  2. Di konsol Google Cloud, buka halaman Colab Enterprise Notebooks.

    Colab Enterprise akan meminta Anda mengaktifkan API yang diperlukan berikut jika belum diaktifkan.

    • Vertex AI API
    • API Dataform
    • Compute Engine API

    Buka Colab Enterprise

  3. Pada menu Region, pilih region tempat Anda ingin membuat notebook. Jika tidak yakin, gunakan us-central1 sebagai regionnya.

    Gunakan region yang sama untuk semua resource dalam tutorial ini.

  4. Klik Create a new notebook.

Notebook baru Anda akan muncul di tab Notebook saya. Untuk menjalankan kode di notebook, tambahkan sel kode dan klik tombol  Run cell.

Menyiapkan lingkungan pengembangan

  1. Di notebook Anda, instal paket Python3 berikut.

    ! pip3 install  google-cloud-aiplatform==1.34.0 \
                    google-cloud-pipeline-components==2.6.0 \
                    kfp==2.4.0 \
                    scikit-learn==1.0.2 \
                    mlflow==2.10.0
    
  2. Tetapkan project Google Cloud CLI dengan menjalankan perintah berikut:

    PROJECT_ID = "PROJECT_ID"
    
    # Set the project id
    ! gcloud config set project {PROJECT_ID}
    

    Ganti PROJECT_ID dengan project ID Anda. Jika perlu, Anda dapat menemukan project ID di Google Cloud Console.

  3. Memberikan peran ke Akun Google Anda:

    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
    
  4. Mengaktifkan API berikut

    • Artifact Registry API
    • BigQuery API
    • Cloud Build API
    • Cloud Functions API
    • Cloud Logging API
    • Pub/Sub API
    • Cloud Run Admin API
    • Cloud Storage API
    • API Eventarc
    • Service Usage API
    • Vertex AI API
    ! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com  eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
    

  5. Berikan peran ke akun layanan project Anda:

    1. Melihat nama akun layanan

      ! gcloud iam service-accounts list
      

      Catat nama agen layanan Compute Anda. Nama tersebut harus dalam format [email protected].

    2. Memberikan peran yang diperlukan ke agen layanan.

      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID[email protected]"" --role=roles/aiplatform.serviceAgent
      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID[email protected]"" --role=roles/eventarc.eventReceiver
      

Mendapatkan dan menyiapkan {i>dataset<i}

Dalam tutorial ini, Anda akan membuat model yang memprediksi tarif naik taksi berdasarkan fitur seperti waktu perjalanan, lokasi, dan jarak. Kita akan menggunakan data dari set data Perjalanan Taksi Chicago publik. Set data ini mencakup perjalanan taksi dari tahun 2013 hingga saat ini, yang dilaporkan ke Kota Chicago dalam perannya sebagai badan pengatur. Untuk melindungi privasi pengemudi dan pengguna kab secara bersamaan dan memungkinkan agregator untuk menganalisis data, ID Taksi dijaga agar tetap konsisten untuk setiap nomor medali taksi yang diberikan, tetapi tidak menampilkan angka, dalam beberapa kasus, Jalur Sensus ditekan, dan waktu dibulatkan ke terdekat 15 menit.

Untuk informasi selengkapnya, lihat Perjalanan Taksi Chicago di Marketplace.

Membuat set data BigQuery

  1. Di konsol Google Cloud, buka BigQuery Studio.

    Buka BigQuery

  2. Di panel Explorer, cari project Anda, klik Actions, lalu klik Create dataset.

  3. Di halaman Create dataset:

    • Untuk Dataset ID, masukkan mlops. Untuk mengetahui informasi selengkapnya, lihat penamaan set data.

    • Untuk Jenis lokasi, pilih multi-region. Misalnya, pilih US (beberapa region di Amerika Serikat) jika Anda menggunakan us-central1. Setelah set data dibuat, lokasi tidak dapat diubah.

    • Klik Create dataset.

Untuk mengetahui informasi selengkapnya, lihat cara membuat set data.

Membuat dan mengisi tabel BigQuery

Di bagian ini, Anda akan membuat tabel dan mengimpor data selama satu tahun dari set data publik ke set data project Anda.

  1. Buka BigQuery Studio

    Buka BigQuery

  2. Klik Create SQL Query dan jalankan kueri SQL berikut dengan mengklik Run.

    CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago`
    AS (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Kueri ini membuat tabel <PROJECT_ID>.mlops.chicago dan mengisinya dengan data dari tabel bigquery-public-data.chicago_taxi_trips.taxi_trips publik.

  3. Untuk melihat skema tabel, klik Go to table, lalu klik tab Schema.

  4. Untuk melihat isi tabel, klik tab Preview.

Membuat dan mengupload paket pelatihan kustom

Di bagian ini, Anda akan membuat paket Python yang berisi kode yang membaca set data, membagi data menjadi set pelatihan dan pengujian, serta melatih model kustom. Paket ini akan dijalankan sebagai salah satu tugas di pipeline Anda. Untuk mengetahui informasi selengkapnya, lihat mem-build aplikasi pelatihan Python untuk container bawaan.

Membuat paket pelatihan kustom

  1. Di notebook Colab, buat folder induk untuk aplikasi pelatihan:

    !mkdir -p training_package/trainer
    
  2. Buat file __init__.py di setiap folder untuk menjadikannya sebuah paket menggunakan perintah berikut:

    ! touch training_package/__init__.py
    ! touch training_package/trainer/__init__.py
    

    Anda dapat melihat file dan folder baru di panel folder File.

  3. Di panel Files, buat file bernama task.py di folder training_package/trainer dengan konten berikut.

    # Import the libraries
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    from google.cloud import bigquery, bigquery_storage
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from google import auth
    from scipy import stats
    import numpy as np
    import argparse
    import joblib
    import pickle
    import csv
    import os
    
    # add parser arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
    parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                        type=str, help='Dir to save the data and the trained model.')
    parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
    args = parser.parse_args()
    
    # data preparation code
    BQ_QUERY = """
    with tmp_table as (
    SELECT trip_seconds, trip_miles, fare,
        tolls,  company,
        pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
        DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
        DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
        CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
    FROM `{}`
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      pickup_latitude IS NOT NULL and
      pickup_longitude IS NOT NULL and
      fare > 0 and
      trip_miles > 0
      and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
    ORDER BY RAND()
    LIMIT 10000)
    SELECT *,
        EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
        EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
        EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
        EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
        FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
    FROM tmp_table
    """.format(args.bq_source)
    # Get default credentials
    credentials, project = auth.default()
    bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
    df = (
        bqclient.query(BQ_QUERY)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )
    # Add 'N/A' for missing 'Company'
    df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
    # Drop rows containing null data.
    df.dropna(how='any', axis='rows', inplace=True)
    # Pickup and dropoff locations distance
    df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100
    
    # Remove extremes, outliers
    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
    df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
    # Reduce location accuracy
    df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})
    
    # Drop the timestamp col
    X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)
    
    # Split the data into train and test
    X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)
    
    ## Format the data for batch predictions
    # select string cols
    string_cols = X_test.select_dtypes(include='object').columns
    # Add quotes around string fields
    X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
    # Add quotes around column names
    X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
    # Save DataFrame to csv
    X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    # Save test data without the target for batch predictions
    X_test.drop('\"fare\"',axis=1,inplace=True)
    X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    
    # Separate the target column
    y_train=X_train.pop('fare')
    # Get the column indexes
    col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
    # Create a column transformer pipeline
    ct_pipe = ColumnTransformer(transformers=[
        ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
        ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
        ('std_scaler', StandardScaler(), [
            col_index_dict['trip_start_year'],
            col_index_dict['abs_distance'],
            col_index_dict['pickup_longitude'],
            col_index_dict['pickup_latitude'],
            col_index_dict['dropoff_longitude'],
            col_index_dict['dropoff_latitude'],
            col_index_dict['trip_miles'],
            col_index_dict['trip_seconds']])
    ])
    # Add the random-forest estimator to the pipeline
    rfr_pipe = Pipeline([
        ('ct', ct_pipe),
        ('forest_reg', RandomForestRegressor(
            n_estimators = 20,
            max_features = 1.0,
            n_jobs = -1,
            random_state = 3,
            max_depth=None,
            max_leaf_nodes=None,
        ))
    ])
    
    # train the model
    rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
    rfr_rmse = np.sqrt(-rfr_score)
    print ("Crossvalidation RMSE:",rfr_rmse.mean())
    final_model=rfr_pipe.fit(X_train, y_train)
    # Save the model pipeline
    with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file:
        pickle.dump(final_model, model_file)
    

    Kode ini menyelesaikan tugas-tugas berikut:

    1. Pilihan fitur.
    2. Mengubah waktu data penjemputan dan pengantaran dari UTC ke waktu lokal Chicago.
    3. Mengekstrak tanggal, jam, hari, bulan, dan tahun dari tanggal pengambilan.
    4. Menghitung durasi perjalanan menggunakan waktu mulai dan berakhir.
    5. Mengidentifikasi dan menandai perjalanan yang dimulai atau berakhir di bandara berdasarkan area komunitas.
    6. Model regresi Random Forest dilatih untuk memprediksi tarif perjalanan taksi menggunakan framework scikit-learn.
    7. Model yang dilatih disimpan ke dalam file acar model.pkl.

      Pendekatan dan rekayasa fitur yang dipilih didasarkan pada eksplorasi dan analisis data tentang Memprediksi Tarif Taksi Chicago.

  4. Di panel Files, buat file bernama setup.py di folder training_package dengan konten berikut.

    from setuptools import find_packages
    from setuptools import setup
    
    REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
    setup(
        name='trainer',
        version='0.1',
        install_requires=REQUIRED_PACKAGES,
        packages=find_packages(),
        include_package_data=True,
        description='Training application package for chicago taxi trip fare prediction.'
    )
    
  5. Di notebook, jalankan setup.py untuk membuat distribusi sumber untuk aplikasi pelatihan Anda:

    ! cd training_package && python setup.py sdist --formats=gztar && cd ..
    

Di akhir bagian ini, panel Files akan berisi file dan folder berikut di training-package.

dist
  trainer-0.1.tar.gz
trainer
  __init__.py
  task.py
trainer.egg-info
__init__.py
setup.py

Mengupload paket pelatihan kustom ke Cloud Storage

  1. Membuat bucket Cloud Storage.

    REGION="REGION"
    BUCKET_NAME = "BUCKET_NAME"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    
    ! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI
    

    Ganti nilai parameter berikut:

    • REGION: Pilih region yang sama dengan yang Anda pilih saat membuat notebook colab.

    • BUCKET_NAME: Nama bucket.

  2. Upload paket pelatihan Anda ke bucket Cloud Storage.

    # Copy the training package to the bucket
    ! gsutil cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
    

Membangun pipeline Anda

Pipeline adalah deskripsi alur kerja MLOps sebagai grafik langkah-langkah yang disebut tugas pipeline.

Di bagian ini, Anda akan menentukan tugas pipeline, mengompilasinya ke YAML, dan mendaftarkan pipeline di Artifact Registry agar dapat dikontrol versi dan dijalankan beberapa kali, oleh satu pengguna atau oleh beberapa pengguna.

Berikut adalah visualisasi tugas, termasuk pelatihan model, upload model, evaluasi model, dan notifikasi email, dalam pipeline kami:

Visualisasi pipeline

Untuk mengetahui informasi selengkapnya, lihat membuat template pipeline.

Menentukan konstanta dan melakukan inisialisasi klien

  1. Di {i>notebook<i} Anda, tentukan konstanta yang akan digunakan di langkah-langkah selanjutnya:

    import os
    
    EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ]
    PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
    PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
    WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
    os.environ['AIP_MODEL_DIR'] = WORKING_DIR
    EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
    PIPELINE_FILE = PIPELINE_NAME + ".yaml"
    

    Ganti NOTIFY_EMAIL dengan alamat email. Saat tugas pipeline selesai, baik berhasil maupun tidak, email akan dikirim ke alamat email tersebut.

  2. Lakukan inisialisasi Vertex AI SDK dengan project, bucket staging, lokasi, dan eksperimen.

    from google.cloud import aiplatform
    
    aiplatform.init(
        project=PROJECT_ID,
        staging_bucket=BUCKET_URI,
        location=REGION,
        experiment=EXPERIMENT_NAME)
    
    aiplatform.autolog()
    

Menentukan tugas pipeline

Di notebook, tentukan pipeline custom_model_training_evaluation_pipeline Anda:

from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source
                            )
    return

Pipeline Anda terdiri dari grafik tugas yang menggunakan Komponen Pipeline Google Cloud berikut:

Mengompilasi pipeline

Kompilasi pipeline menggunakan compiler Kubeflow Pipeline (KFP) ke file YAML yang berisi representasi hermetis pipeline Anda.

from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

Anda akan melihat file YAML bernama vertex-pipeline-datatrigger-tutorial.yaml di direktori kerja.

Mengupload pipeline sebagai template

  1. Buat repositori jenis KFP di Artifact Registry.

    REPO_NAME = "mlops"
    # Create a repo in the artifact registry
    ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
    
  2. Upload pipeline yang dikompilasi ke repositori.

    from kfp.registry import RegistryClient
    
    host = f"http://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
    client = RegistryClient(host=host)
    TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=["v1", "latest"],
    extra_headers={"description":"This is an example pipeline template."})
    TEMPLATE_URI = f"http://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
    
  3. Di konsol Google Cloud, pastikan template Anda muncul di Pipeline Templates.

    Buka Pipeline Templates

Menjalankan pipeline secara manual

Untuk memastikan bahwa pipeline berfungsi, jalankan pipeline secara manual.

  1. Di notebook Anda, tentukan parameter yang diperlukan untuk menjalankan pipeline sebagai tugas.

    DATASET_NAME = "mlops"
    TABLE_NAME = "chicago"
    
    worker_pool_specs = [{
                            "machine_spec": {"machine_type": "e2-highmem-2"},
                            "replica_count": 1,
                            "python_package_spec":{
                                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                    "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                    "python_module": "trainer.task",
                                    "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                            },
    }]
    
    parameters = {
        "project": PROJECT_ID,
        "location": REGION,
        "training_job_display_name": "taxifare-prediction-training-job",
        "worker_pool_specs": worker_pool_specs,
        "base_output_dir": BUCKET_URI,
        "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        "model_display_name": "taxifare-prediction-model",
        "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
        "target_field_name": "fare",
        "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
        "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
        "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
        "existing_model": False
    }
    
  2. Membuat dan menjalankan tugas pipeline.

    # Create a pipeline job
    job = aiplatform.PipelineJob(
        display_name="triggered_custom_regression_evaluation",
        template_path=TEMPLATE_URI ,
        parameter_values=parameters,
        pipeline_root=BUCKET_URI,
        enable_caching=False
    )
    # Run the pipeline job
    job.run()
    

    Tugas ini memerlukan waktu sekitar 30 menit untuk diselesaikan.

  3. Di konsol, Anda akan melihat pipeline baru yang dijalankan di halaman Pipelines:

    Buka Pipeline Runs

  4. Setelah pipeline berjalan selesai, Anda akan melihat model baru bernama taxifare-prediction-model atau versi model baru di Vertex AI Model Registry:

    Buka Model Registry

  5. Anda juga akan melihat tugas prediksi batch baru:

    Buka Prediksi Batch

Membuat fungsi yang memicu pipeline Anda

Pada langkah ini, Anda akan membuat Cloud Function (generasi ke-2) yang menjalankan pipeline setiap kali data baru dimasukkan ke dalam tabel BigQuery.

Secara khusus, kami menggunakan Eventarc untuk memicu fungsi setiap kali peristiwa google.cloud.bigquery.v2.JobService.InsertJob terjadi. Fungsi ini kemudian menjalankan template pipeline.

Untuk mengetahui informasi selengkapnya, lihat Pemicu Eventarc dan jenis peristiwa yang didukung.

Membuat fungsi dengan pemicu Eventarc

  1. Di konsol Google Cloud, buka Cloud Functions.

    Buka Cloud Functions

  2. Klik tombol Create Function. Di halaman Configuration:

    1. Pilih generasi ke-2 sebagai lingkungan Anda.

    2. Untuk Function name, gunakan mlops.

    3. Untuk Region, pilih region yang sama dengan bucket Cloud Storage dan repositori Artifact Registry Anda.

    4. Untuk Pemicu, pilih Pemicu lainnya. Panel Eventarc Trigger akan terbuka.

      1. Untuk Jenis Pemicu, pilih Sumber Google.

      2. Untuk Penyedia Peristiwa, pilih BigQuery.

      3. Untuk Jenis acara, pilih google.cloud.bigquery.v2.JobService.InsertJob.

      4. Untuk Resource, pilih Specific resource dan tentukan tabel BigQuery

        projects/PROJECT_ID/datasets/mlops/tables/chicago
        
      5. Di kolom Region, pilih lokasi untuk pemicu Eventarc, jika ada. Lihat Lokasi pemicu untuk mengetahui informasi selengkapnya.

      6. Klik Simpan Pemicu.

    5. Jika Anda diminta untuk memberikan peran ke akun layanan, klik Grant All.

  3. Klik Next untuk membuka halaman Code. Di halaman Code:

    1. Setel Runtime ke python 3.12.

    2. Tetapkan Entry point ke mlops_entrypoint.

    3. Dengan Inline Editor, buka file main.py dan ganti konten dengan berikut ini:

      Ganti PROJECT_ID,REGION,BUCKET_NAME dengan nilai yang Anda gunakan sebelumnya.

      import json
      import functions_framework
      import requests
      import google.auth
      import google.auth.transport.requests
      # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
      # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
      @functions_framework.cloud_event
      def mlops_entrypoint(cloudevent):
          # Print out the CloudEvent's (required) `type` property
          # See http://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type
          print(f"Event type: {cloudevent['type']}")
      
          # Print out the CloudEvent's (optional) `subject` property
          # See http://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject
          if 'subject' in cloudevent:
              # CloudEvent objects don't support `get` operations.
              # Use the `in` operator to verify `subject` is present.
              print(f"Subject: {cloudevent['subject']}")
      
          # Print out details from the `protoPayload`
          # This field encapsulates a Cloud Audit Logging entry
          # See http://cloud.go888ogle.com.fqhub.com/logging/docs/audit#audit_log_entry_structure
      
          payload = cloudevent.data.get("protoPayload")
          if payload:
              print(f"API method: {payload.get('methodName')}")
              print(f"Resource name: {payload.get('resourceName')}")
              print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
              row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount')
              print(f"No. of rows: {row_count} !!")
              if row_count:
                  if int(row_count) > 0:
                      print ("Pipeline trigger Condition met !!")
                      submit_pipeline_job()
              else:
                  print ("No pipeline triggered !!!")
      
      def submit_pipeline_job():
          PROJECT_ID = 'PROJECT_ID'
          REGION = 'REGION'
          BUCKET_NAME = "BUCKET_NAME"
          DATASET_NAME = "mlops"
          TABLE_NAME = "chicago"
      
          base_output_dir = BUCKET_NAME
          BUCKET_URI = "gs://{}".format(BUCKET_NAME)
          PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
          PIPELINE_NAME = "vertex-mlops-pipeline-tutorial"
          EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
          REPO_NAME ="mlops"
          TEMPLATE_NAME="custom-model-training-evaluation-pipeline"
          TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job"
          worker_pool_specs = [{
                              "machine_spec": {"machine_type": "e2-highmem-2"},
                              "replica_count": 1,
                              "python_package_spec":{
                                      "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                      "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                      "python_module": "trainer.task",
                                      "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                              },
          }]
      
          parameters = {
              "project": PROJECT_ID,
              "location": REGION,
              "training_job_display_name": "taxifare-prediction-training-job",
              "worker_pool_specs": worker_pool_specs,
              "base_output_dir": BUCKET_URI,
              "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
              "model_display_name": "taxifare-prediction-model",
              "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
              "target_field_name": "fare",
              "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
              "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
              "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
              "existing_model": False
          }
          TEMPLATE_URI = f"http://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
          print("TEMPLATE URI: ", TEMPLATE_URI)
          request_body = {
              "name": PIPELINE_NAME,
              "displayName": PIPELINE_NAME,
              "runtimeConfig":{
                  "gcsOutputDirectory": PIPELINE_ROOT,
                  "parameterValues": parameters,
              },
              "templateUri": TEMPLATE_URI
          }
          pipeline_url = "http://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION)
          creds, project = google.auth.default()
          auth_req = google.auth.transport.requests.Request()
          creds.refresh(auth_req)
          headers = {
          'Authorization': 'Bearer {}'.format(creds.token),
          'Content-Type': 'application/json; charset=utf-8'
          }
          response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body))
          print(response.text)
      
    4. Buka file requirements.txt dan ganti konten dengan kode berikut:

      requests==2.31.0
      google-auth==2.25.1
      
  4. Klik Deploy untuk men-deploy fungsi.

Menyisipkan data untuk memicu pipeline

  1. Di konsol Google Cloud, buka BigQuery Studio.

    Buka BigQuery

  2. Klik Create SQL Query dan jalankan kueri SQL berikut dengan mengklik Run.

    INSERT INTO `PROJECT_ID.mlops.chicago`
    (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Kueri SQL ini untuk menyisipkan baris baru ke dalam tabel.

  3. Untuk memastikan apakah peristiwa terpicu atau tidak, telusuri pipeline trigger condition met di log fungsi Anda.

    Buka Cloud Functions

  4. Jika fungsi berhasil dipicu, Anda akan melihat pipeline baru berjalan di Vertex AI Pipelines. Tugas pipeline memerlukan waktu sekitar 30 menit untuk diselesaikan.

    Buka Vertex AI Pipelines

Pembersihan

Untuk membersihkan semua resource Google Cloud yang digunakan untuk project ini, Anda dapat menghapus project Google Cloud yang digunakan untuk tutorial.

Selain itu, Anda dapat menghapus setiap resource yang Anda buat untuk tutorial ini.

  1. Menghapus notebook colab.

    Buka Colab Enterprise

  2. Menghapus set data di BigQuery.

    Buka BigQuery

  3. Menghapus Cloud Storage.

    Buka Cloud Storage

  4. Menghapus resource dari Vertex AI.

  5. Hapus repositori dari Artifact Registry.

    Buka Artifact Registry

  6. Hapus Cloud Function.

    Buka Cloud Functions