Usa el conector de Spark de Bigtable
El conector de Spark de Bigtable te permite leer y escribir datos en y desde Bigtable. Puedes leer datos de tu aplicación Spark con Spark SQL y DataFrames. Las siguientes operaciones de Bigtable son compatibles con el conector Spark de Bigtable:
- Escribe datos
- Lee datos
- Crea una tabla nueva
En este documento, se muestra cómo convertir una tabla de DataFrames de Spark SQL en una tabla de Bigtable y, luego, compilar y crear un archivo JAR para enviar un trabajo de Spark.
Estado de compatibilidad de Spark y Scala
El conector de Spark de Bigtable solo es compatible con la versión 2.12 de Scala y las siguientes versiones de Spark:
El conector de Spark de Bigtable admite las siguientes versiones de Dataproc:
- Clúster de versiones de imágenes 1.5
- Clúster de versiones de imagen 2.0
- Clúster de versiones de imágenes 2.1
- Clúster de versiones de imagen 2.2
- Versión 1.0 del entorno de ejecución sin servidores de Dataproc
- Versión 2.0 del entorno de ejecución sin servidores de Dataproc
Calcula los costos
Si decides usar cualquiera de los siguientes componentes facturables de Google Cloud, se te facturarán los recursos que uses:
- Bigtable (no se te cobra por usar el emulador de Bigtable)
- Dataproc
- Cloud Storage
Los precios de Dataproc se aplican al uso de Dataproc en clústeres de Compute Engine. Los precios de Dataproc Serverless se aplican a las cargas de trabajo y a las sesiones que se ejecutan en Dataproc Serverless para Spark.
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Antes de comenzar
Completa los siguientes requisitos previos antes de usar el conector de Spark de Bigtable.
Funciones obligatorias
Si quieres obtener los permisos que necesitas para usar el conector de Spark de Bigtable, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:
-
Administrador de Bigtable (
roles/bigtable.admin
)(opcional): Te permite leer o escribir datos, y crear una tabla nueva. -
Usuario de Bigtable (
roles/bigtable.user
): Te permite leer o escribir datos, pero no crear una tabla nueva.
Si quieres obtener más información para otorgar roles, consulta Administra el acceso.
Es posible que también puedas obtener los permisos necesarios a través de los roles personalizados o de otros roles predefinidos.
Si usas Dataproc o Cloud Storage, es posible que se requieran permisos adicionales. Para obtener más información, consulta Permisos de Dataproc y permisos de Cloud Storage.
Configura Spark
Además de crear una instancia de Bigtable, también debes configurar tu instancia de Spark. Puedes hacerlo de forma local o seleccionar cualquiera de estas opciones para usar Spark con Dataproc:
- Clúster de Dataproc
- Dataproc sin servidores
Para obtener más información sobre cómo elegir entre un clúster de Dataproc o una opción sin servidores, consulta la documentación de Dataproc sin servidores para Spark en comparación con Dataproc en Compute Engine .
Descarga el archivo JAR del conector
Puedes encontrar el código fuente del conector de Spark de Bigtable con ejemplos en el repositorio de GitHub del conector de Spark de Bigtable.
Según la configuración de Spark, puedes acceder al archivo JAR de la siguiente manera:
Si ejecutas PySpark de forma local, debes descargar el archivo JAR del conector desde la ubicación
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
de Cloud Storage.Reemplaza
SCALA_VERSION
por la versión de Scala, establecida en2.12
como la única versión compatible, y enCONNECTOR_VERSION
por la versión del conector que deseas usar.En el caso del clúster de Dataproc o la opción sin servidores, usa el archivo JAR más reciente como un artefacto que se pueda agregar a tus aplicaciones de Scala o Java Spark. Para obtener más información sobre el uso del archivo JAR como artefacto, consulta Cómo administrar dependencias.
Si envías tu trabajo de PySpark a Dataproc, usa la marca
gcloud dataproc jobs submit pyspark --jars
para establecer el URI en la ubicación del archivo JAR en Cloud Storage, por ejemplo,gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.
Agrega la configuración de Bigtable a tu aplicación Spark
En tu aplicación Spark, agrega las opciones de Spark que te permiten interactuar con Bigtable.
Opciones de Spark compatibles
Usa las opciones de Spark que están disponibles como parte del paquete com.google.cloud.spark.bigtable
.
Nombre de la opción | Requeridos | Valor predeterminado | Significado |
---|---|---|---|
spark.bigtable.project.id |
Sí | N/A | Configura el ID del proyecto de Bigtable. |
spark.bigtable.instance.id |
Sí | N/A | Establece el ID de la instancia de Bigtable. |
catalog |
Sí | N/A | Establece el formato JSON que especifica el formato de conversión entre el esquema similar a SQL del DataFrame y el esquema de la tabla de Bigtable. Consulta Crea metadatos de tabla en formato JSON para obtener más información. |
spark.bigtable.app_profile.id |
No | default |
Configura el ID de perfil de la app de Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
No | Hora actual del sistema | Configura la marca de tiempo en milisegundos que se usará cuando se escriba un DataFrame en Bigtable. Ten en cuenta que, como todas las filas del DataFrame usan la misma marca de tiempo, las filas con la misma columna de clave de fila en el DataFrame se conservan como una sola versión en Bigtable, ya que comparten la misma marca de tiempo. |
spark.bigtable.create.new.table |
No | false |
Configúralo como true para crear una tabla nueva antes de escribir en Bigtable. |
spark.bigtable.read.timerange.start.milliseconds o spark.bigtable.read.timerange.end.milliseconds |
No | N/A | Establece marcas de tiempo (en milisegundos desde el tiempo de época) para filtrar celdas con una fecha de inicio y de finalización específicas, respectivamente. Se deben especificar ambos o ninguno. |
spark.bigtable.push.down.row.key.filters |
No | true |
Configúralo como true para permitir el filtrado simple de clave de fila del servidor. El filtrado por claves de fila compuestas se implementa en el lado del cliente.Consulta Cómo leer una fila específica de DataFrame con un filtro para obtener más información. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
No | 30 min | Establece la duración del tiempo de espera para un intento de fila de lectura correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
No | 12 h | Establece la duración del tiempo de espera total para un intento de fila de lectura correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
No | 1 min | Establece la duración del tiempo de espera para un intento de mutación de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
No | 10 min | Establece la duración del tiempo de espera total para un intento de mutación de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java. |
spark.bigtable.batch.mutate.size |
No | 100 |
Establece la cantidad de mutaciones en cada lote. El valor máximo que puedes establecer es 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
No | false |
Configúralo como true a fin de habilitar el control de flujo para las mutaciones por lotes. |
Crea metadatos de tablas en formato JSON
El formato de tabla DataFrames de Spark SQL debe convertirse en una tabla de Bigtable con una cadena con formato JSON. Este formato JSON de string hace que el formato de datos sea compatible con Bigtable. Puedes pasar el formato JSON en el código de la aplicación con la opción .option("catalog", catalog_json_string)
.
Como ejemplo, considera la siguiente tabla de DataFrame y la tabla de Bigtable correspondiente.
En este ejemplo, las columnas name
y birthYear
del DataFrame se agrupan en la familia de columnas info
y se les cambia el nombre a name
y birth_year
, respectivamente. De manera similar, la columna address
se almacena en la familia de columnas location
con el mismo nombre de columna. La columna id
del DataFrame se convierte en la clave de fila de Bigtable.
Las claves de fila no tienen un nombre de columna dedicado en Bigtable y, en este ejemplo, id_rowkey
solo se usa para indicarle al conector que esta es la columna de clave de fila. Puedes usar cualquier nombre para la columna de clave de fila y asegurarte de usar el mismo nombre cuando declares el campo "rowkey":"column_name"
en el formato JSON.
DataFrame | Tabla de Bigtable = t1 | |||||||
Columnas | Clave de fila | Familias de columnas | ||||||
información | ubicación | |||||||
Columnas | Columnas | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
El formato JSON para el catálogo es el siguiente:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
Las claves y los valores usados en el formato JSON son los siguientes:
Clave de catálogo | Valor de catálogo | Formato JSON |
---|---|---|
table | Nombre de la tabla de Bigtable. | "table":{"name":"t1"} Si la tabla no existe, usa .option("spark.bigtable.create.new.table", "true") para crearla. |
clave de fila | Nombre de la columna que se usará como clave de fila de Bigtable. Asegúrate de que el nombre de la columna de DataFrame se use como la clave de fila, por ejemplo, id_rowkey . Las claves compuestas también se aceptan como claves de fila. Por ejemplo, "rowkey":"name:address" . Este enfoque puede generar claves de fila que requieran un análisis completo de la tabla para todas las solicitudes de lectura. |
"rowkey":"id_rowkey" , |
columnas | Asignación de cada columna de DataFrame a la familia de columnas de Bigtable correspondiente ("cf" ) y al nombre de la columna ("col" ). El nombre de la columna puede ser diferente del nombre de la columna en la tabla DataFrame. Los tipos de datos admitidos incluyen string , long y binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" En este ejemplo, id_rowkey es la clave de fila, y info y location son las familias de columnas. |
Tipos de datos admitidos
El conector admite el uso de los tipos string
, long
y binary
(array de bytes) en el catálogo. Hasta que se agregue compatibilidad con otros tipos, como int
y float
, puedes convertir manualmente esos tipos de datos en arreglos de bytes (BinaryType
de Spark SQL) antes de usar el conector para escribirlos en Bigtable.
Además, puedes usar Avro para serializar tipos complejos, como ArrayType
. Para obtener más información, consulta Serializa tipos de datos complejos con Apache Avro.
Escribe en Bigtable
Usa la función .write()
y las opciones compatibles para escribir tus datos en Bigtable.
Java
El siguiente código del repositorio de GitHub usa Java y Maven para escribir en Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
El siguiente código del repositorio de GitHub usa Python para escribir en Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Lee desde Bigtable
Usa la función .read()
para verificar si la tabla se importó correctamente a Bigtable.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Compila tu proyecto
Genera el archivo JAR que se usa para ejecutar un trabajo en un clúster de Dataproc, en Dataproc Serverless o en una instancia local de Spark. Puedes compilar el archivo JAR de forma local y, luego, usarlo para enviar un trabajo. La ruta al archivo JAR compilado se establece como la variable de entorno PATH_TO_COMPILED_JAR
cuando envías un trabajo.
Este paso no se aplica a las aplicaciones de PySpark.
Administra dependencias
El conector de Spark de Bigtable admite las siguientes herramientas de administración de dependencias:
Compila el archivo JAR
Maven
Agrega la dependencia
spark-bigtable
a tu archivo pom.xml.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Agrega el complemento Maven Shade a tu archivo
pom.xml
para crear un archivo uber JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Ejecuta el comando
mvn clean install
para generar un archivo JAR.
SBT
Agrega la dependencia
spark-bigtable
a tu archivobuild.sbt
:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Agrega el complemento
sbt-assembly
a tu archivoproject/plugins.sbt
oproject/assembly.sbt
para crear un archivo Uber JAR.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Ejecuta el comando
sbt clean assembly
para generar el archivo JAR.
Gradle
Agrega la dependencia
spark-bigtable
al archivobuild.gradle
.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Agrega el complemento Shadow a tu archivo
build.gradle
para crear un archivo uber JAR:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Consulta la documentación del complemento Shadow para obtener más información sobre la configuración y la compilación de JAR.
Envía un trabajo
Envía un trabajo de Spark mediante Dataproc, Dataproc Serverless o una instancia local de Spark para iniciar tu aplicación.
Configura el entorno de ejecución
Configura las siguientes variables de entorno:
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Reemplaza lo siguiente:
- PROJECT_ID: El identificador permanente del proyecto de Bigtable.
- INSTANCE_ID: El identificador permanente de la instancia de Bigtable.
- TABLE_NAME: El identificador permanente de la tabla.
- DATAPROC_CLUSTER: El identificador permanente del clúster de Dataproc.
- DATAPROC_REGION: Es la región de Dataproc que contiene uno de los clústeres en tu instancia de Dataproc, por ejemplo,
northamerica-northeast2
. - DATAPROC_ZONE: Es la zona en la que se ejecuta el clúster de Dataproc.
- SUBNET: Es la ruta de acceso completa del recurso de la subred.
- GCS_BUCKET_NAME: El bucket de Cloud Storage para subir las dependencias de las cargas de trabajo de Spark.
- PATH_TO_COMPILED_JAR: Es la ruta de acceso completa o relativa al JAR compilado, por ejemplo,
/path/to/project/root/target/<compiled_JAR_name>
para Maven. - GCS_PATH_TO_CONNECTOR_JAR: El bucket
gs://spark-lib/bigtable
de Cloud Storage, en el que se encuentra el archivospark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar
. - PATH_TO_PYTHON_FILE: En las aplicaciones de PySpark, es la ruta de acceso al archivo de Python que se usará para escribir datos y leerlos de Bigtable.
- LOCAL_PATH_TO_CONNECTOR_JAR: Para las aplicaciones de PySpark, es la ruta de acceso al archivo JAR del conector de Spark de Bigtable descargado.
Envía un trabajo de Spark
Para las instancias de Dataproc o tu configuración local de Spark, ejecuta un trabajo de Spark para subir datos a Bigtable.
Clúster de Dataproc
Usar el archivo JAR compilado y crear un trabajo de clúster de Dataproc que lea y escriba datos desde y en Bigtable
Crea un clúster de Dataproc. En el siguiente ejemplo, aparece un comando de muestra para crear un clúster de Dataproc v2.0 con Debian 10, dos nodos trabajadores y parámetros de configuración predeterminados.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Enviar un trabajo
Scala/Java
En el siguiente ejemplo, se muestra la clase
spark.bigtable.example.WordCount
que incluye la lógica para crear una tabla de prueba en DataFrame, escribir la tabla en Bigtable y, luego, contar la cantidad de palabras en la tabla.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc sin servidores
Usar el archivo JAR compilado y crear un trabajo de Dataproc que lea y escriba datos desde y en Bigtable con una instancia sin servidores de Dataproc
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Spark local
Usar el archivo JAR descargado y crear un trabajo de Spark que lea y escriba datos en Bigtable y en Bigtable con una instancia local de Spark También puedes usar el emulador de Bigtable para enviar el trabajo de Spark.
Usa el emulador de Bigtable
Si decides usar el emulador de Bigtable, sigue estos pasos:
Ejecuta el siguiente comando para iniciar el emulador
gcloud beta emulators bigtable start
De forma predeterminada, el emulador elige
localhost:8086
.Establece la variable de entorno
BIGTABLE_EMULATOR_HOST
:export BIGTABLE_EMULATOR_HOST=localhost:8086
Para obtener más información sobre el uso del emulador de Bigtable, consulta Cómo realizar pruebas con el emulador.
Envía un trabajo de Spark
Usa el comando spark-submit
para enviar un trabajo de Spark sin importar si usas un emulador local de Bigtable.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Verifica los datos de la tabla
Ejecuta el siguiente comando de la CLI de cbt
para verificar que los datos se escriban en Bigtable. La
CLI de cbt
es un componente de Google Cloud CLI. Para obtener más información, consulta la
descripción general de la CLI de cbt
.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Soluciones adicionales
Usa el conector de Spark de Bigtable para soluciones específicas, como la serialización de tipos complejos de Spark SQL, la lectura de filas específicas y la generación de métricas del cliente.
Leer una fila específica de DataFrame con un filtro
Cuando usas DataFrames para leer desde Bigtable, puedes especificar un filtro para leer solo filas específicas. Los filtros simples, como ==
, <=
y startsWith
en la columna de clave de fila, se aplican en el servidor para evitar un análisis de tabla completa. Los filtros en claves de fila compuestas o filtros complejos, como el filtro LIKE
en la columna de clave de fila, se aplican del lado del cliente.
Si lees tablas grandes, te recomendamos que uses filtros simples de clave de fila para evitar realizar un análisis de tabla completa. La siguiente instrucción de ejemplo muestra cómo leer con un filtro simple. Asegúrate de que, en tu filtro Spark, uses el nombre de la columna DataFrame que se convierte en la clave de fila:
dataframe.filter("id == 'some_id'").show()
Cuando apliques un filtro, usa el nombre de la columna de DataFrame en lugar del nombre de la columna de la tabla de Bigtable.
Serializa tipos de datos complejos con Apache Avro
El conector de Spark de Bigtable proporciona compatibilidad con el uso de Apache Avro para serializar tipos de Spark SQL complejos, como ArrayType
, MapType
o StructType
. Apache Avro proporciona serialización para datos de registro que se usa comúnmente para procesar y almacenar estructuras de datos complejas.
Usa una sintaxis como "avro":"avroSchema"
para especificar que una columna de Bigtable se debe codificar con Avro. Luego, puedes usar .option("avroSchema", avroSchemaString)
cuando lees o escribas en Bigtable para especificar el esquema de Avro correspondiente a esa columna en formato de string. Puedes usar diferentes nombres de opciones, por ejemplo, "anotherAvroSchema"
para diferentes columnas y pasar esquemas de Avro en varias columnas.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Usa métricas del cliente
Dado que el conector Spark de Bigtable se basa en el cliente de Bigtable para Java, las métricas del cliente están habilitadas dentro del conector de forma predeterminada. Puedes consultar la documentación de las métricas del cliente para obtener más detalles sobre cómo acceder a estas métricas y cómo interpretarlas.
Usa el cliente de Bigtable para Java con funciones de RDD de bajo nivel
Dado que el conector Spark de Bigtable se basa en el cliente de Bigtable para Java, puedes usar el cliente directamente en tus aplicaciones Spark y realizar solicitudes de lectura o escritura distribuidas dentro de las funciones de RDD de bajo nivel, como mapPartitions
y foreachPartition
.
Si deseas usar el cliente de Bigtable para clases de Java, agrega el prefijo com.google.cloud.spark.bigtable.repackaged
a los nombres de los paquetes. Por ejemplo, en lugar de usar el nombre de clase como com.google.cloud.bigtable.data.v2.BigtableDataClient
, usa com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Si deseas obtener más información sobre el cliente de Bigtable para Java, consulta Cliente de Bigtable para Java.
¿Qué sigue?
- Aprende a ajustar tu trabajo de Spark en Dataproc.
- Usar las clases del cliente de Bigtable para Java con el conector de Spark de Bigtable.