Estendi privacy differenziale

Questo documento fornisce esempi di come estendere la privacy differenziale per la privacy differenziale di BigQuery.

Panoramica

BigQuery consente di estendere la privacy differenziale alle origini dati multi-cloud e alle librerie di privacy differenziale esterne. Questo documento fornisce esempi di come applicare la privacy differenziale per origini dati multi-cloud come AWS S3 con BigQuery Omni, di come chiamare una libreria di privacy differenziale esterna utilizzando una funzione remota e di come eseguire aggregazioni di privacy differenziale con PipelineDP, una libreria Python che può essere eseguita con Apache Spark e Apache Beam.

Per maggiori informazioni sulla privacy differenziale, consulta Usare la privacy differenziale.

Privacy differenziale con BigQuery Omni

La privacy differenziale di BigQuery supporta le chiamate a origini dati multi-cloud come AWS S3. L'esempio seguente esegue query su una sorgente di dati esterna, foo.wikidata e applica la privacy differenziale. Per ulteriori informazioni sulla sintassi della clausola sulla privacy differenziale, consulta la clausola sulla privacy differenziale.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Questo esempio restituisce risultati simili ai seguenti:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Per ulteriori informazioni sulle limitazioni di BigQuery Omni, consulta Limitazioni.

Chiamare librerie della privacy differenziale esterne con funzioni remote

Puoi chiamare le librerie della privacy differenziale esterne utilizzando funzioni remote. Il seguente link utilizza una funzione remota per chiamare una libreria esterna ospitata da Tumult Analytics al fine di utilizzare la privacy differenziale a zero concentrazione in un set di dati delle vendite al dettaglio.

Per informazioni sull'utilizzo di Tumult Analytics, consulta il post sul lancio di Tumult Analytics {: .external}.

Aggregazioni per privacy differenziale con PipelineDP

PipelineDP è una libreria Python che esegue aggregazioni per la privacy differenziale e può essere eseguita con Apache Spark e Apache Beam. BigQuery può eseguire stored procedure di Apache Spark scritte in Python. Per ulteriori informazioni sull'esecuzione delle stored procedure Apache Spark, consulta Utilizzare le stored procedure per Apache Spark.

L'esempio seguente esegue un'aggregazione della privacy differenziale utilizzando la libreria PipelineDP. Utilizza il set di dati pubblico Chicago Taxi Trips e calcola, per ogni taxi, il numero di corse, nonché la somma e la media dei suggerimenti per queste corse.

Prima di iniziare

Un'immagine Apache Spark standard non include PipelineDP. Prima di eseguire una stored procedure di PipelineDP, devi creare un'immagine Docker contenente tutte le dipendenze necessarie. Questa sezione descrive come creare ed eseguire il push di un'immagine Docker su Google Cloud.

Prima di iniziare, assicurati di aver installato Docker sulla macchina locale e di aver configurato l'autenticazione per il push delle immagini Docker in gcr.io. Per ulteriori informazioni sul push delle immagini Docker, consulta Eseguire il push e il pull delle immagini.

Crea ed esegui il push di un'immagine Docker

Per creare ed eseguire il push di un'immagine Docker con le dipendenze richieste, segui questi passaggi:

  1. Crea una cartella locale DIR.
  2. Scarica il programma di installazione di Miniconda, con la versione Python 3.9, su DIR.
  3. Salva il testo seguente nel Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      #
      # Use mamba to install packages quickly.
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
      && ${CONDA_HOME}/bin/mamba install \
        conda \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        openblas \
        orc \
        pandas \
        pyarrow \
        pysal \
        pytables \
        python \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        virtualenv
    
      RUN apt install -y python3-pip
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
    
  4. Esegui questo comando.

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare l'immagine Docker.
    • DOCKER_IMAGE: nome dell'immagine Docker.

    L'immagine viene caricata.

Esegui una procedura archiviata di PipelineDP

  1. Per creare una stored procedure, utilizza l'istruzione CREATE PROCEDURE.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare la stored procedure.
    • DATASET_ID: il set di dati in cui vuoi creare la stored procedure.
    • REGION: la regione in cui si trova il progetto.
    • DOCKER_IMAGE: nome dell'immagine Docker.
    • CONNECTION_ID: il nome della connessione.
    • TABLE_NAME: il nome della tabella.
  2. Utilizza l'istruzione CALL per chiamare la procedura.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare la stored procedure.
    • DATASET_ID: il set di dati in cui vuoi creare la stored procedure.

Passaggi successivi