Apache Kafka è una piattaforma di streaming distribuita open source per pipeline di dati e integrazione dei dati in tempo reale. Fornisce un sistema di streaming efficiente e scalabile per l'utilizzo in una varietà di applicazioni, tra cui:
- Analisi in tempo reale
- Elaborazione dei flussi
- Aggregazione dei log
- Messaggistica distribuita
- Streaming di eventi
Obiettivi
Installa Kafka su un cluster Managed Service per Apache Spark HA con ZooKeeper (in questo tutorial indicato come "cluster Managed Service per Apache Spark Kafka").
Crea dati fittizi dei clienti, quindi pubblicali in un argomento Kafka.
Crea tabelle Hive parquet e ORC in Cloud Storage per ricevere i dati degli argomenti Kafka in streaming.
Invia un job PySpark per abbonarti e trasmettere in streaming l'argomento Kafka in Cloud Storage in formato Parquet e ORC.
Esegui una query sui dati della tabella Hive in streaming per conteggiare i messaggi Kafka in streaming.
Costi
In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il calcolatore prezzi.
Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per saperne di più, consulta Esegui la pulizia.
Prima di iniziare
Se non l'hai ancora fatto, crea un progetto Google Cloud .
- Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.
- Fai clic su Crea.
- Nella pagina Crea un bucket, inserisci le informazioni del bucket. Per andare al passaggio
successivo, fai clic su Continua.
-
Nella sezione Inizia, segui questi passaggi:
- Inserisci un nome univoco globale che soddisfi i requisiti per la denominazione dei bucket.
- Per aggiungere un'etichetta bucket, espandi la sezione Etichette (), fai clic su add_box
Aggiungi etichetta e specifica un
keye unvalueper l'etichetta.
-
Nella sezione Scegli dove archiviare i tuoi dati, segui questi passaggi:
- Seleziona un Tipo di località.
- Scegli una posizione in cui i dati del bucket vengono archiviati in modo permanente dal menu a discesa Tipo di località.
- Se selezioni il tipo di località a due regioni, puoi anche scegliere di attivare la replica turbo utilizzando la casella di controllo pertinente.
- Per configurare la replica tra bucket, seleziona
Aggiungi una replica tra bucket mediante Storage Transfer Service e
segui questi passaggi:
Configura la replica tra bucket
- Nel menu Bucket, seleziona un bucket.
Nella sezione Impostazioni di replica, fai clic su Configura per configurare le impostazioni per il job di replica.
Viene visualizzato il riquadro Configura replica tra bucket.
- Per filtrare gli oggetti da replicare in base al prefisso del nome dell'oggetto, inserisci un prefisso da cui includere o escludere gli oggetti, quindi fai clic su Aggiungi un prefisso.
- Per impostare una classe di archiviazione per gli oggetti replicati, seleziona una classe di archiviazione dal menu Classe di archiviazione. Se salti questo passaggio, gli oggetti replicati utilizzeranno per impostazione predefinita la classe di archiviazione del bucket di destinazione.
- Fai clic su Fine.
-
Nella sezione Scegli come archiviare i tuoi dati, segui questi passaggi:
- Seleziona una classe di archiviazione predefinita per il bucket o Autoclass per la gestione automatica della classe di archiviazione dei dati del bucket.
- Per attivare lo spazio dei nomi gerarchico, nella sezione Ottimizza l'archiviazione per workload con uso intensivo dei dati, seleziona Abilita uno spazio dei nomi gerarchico in questo bucket.
- Nella sezione Scegli come controllare l'accesso agli oggetti, seleziona se il bucket applica o meno la prevenzione dell'accesso pubblico e seleziona un metodo di controllo dell'accesso per gli oggetti del bucket.
-
Nella sezione Scegli come proteggere i dati degli oggetti, segui questi passaggi:
- Seleziona una delle opzioni in Protezione dei dati che vuoi impostare per il bucket.
- Per attivare l'eliminazione temporanea, fai clic sulla casella di controllo Criterio di eliminazione temporanea (per il recupero dei dati) e specifica il numero di giorni per cui vuoi conservare gli oggetti dopo l'eliminazione.
- Per impostare il controllo delle versioni degli oggetti, seleziona la casella di controllo Controllo delle versioni degli oggetti (per il controllo delle versioni) e specifica il numero massimo di versioni per oggetto e il numero di giorni dopo i quali scadono le versioni non correnti.
- Per abilitare il criterio di conservazione su oggetti e bucket, seleziona la casella di controllo Conservazione (per la conformità), quindi procedi nel seguente modo:
- Per attivare il blocco della conservazione degli oggetti, fai clic sulla casella di controllo Abilita conservazione degli oggetti.
- Per attivare Bucket Lock, fai clic sulla casella di controllo Imposta criterio di conservazione del bucket e scegli un'unità di tempo e una durata per il periodo di conservazione.
- Per scegliere come verranno criptati i dati degli oggetti, espandi la sezione Crittografia dei dati () e seleziona un metodo di crittografia dei dati.
- Seleziona una delle opzioni in Protezione dei dati che vuoi impostare per il bucket.
-
Nella sezione Inizia, segui questi passaggi:
- Fai clic su Crea.
Passaggi del tutorial
Segui questi passaggi per creare un cluster Managed Service per Apache Spark Kafka per leggere un argomento Kafka in Cloud Storage in formato parquet O ORC.
Copia lo script di installazione di Kafka in Cloud Storage
Lo script dell'kafka.sh azione di inizializzazione
installa Kafka su un cluster Managed Service per Apache Spark.
Sfoglia il codice.
Copia lo script
kafka.shdell'azione di inizializzazione nel bucket Cloud Storage. Questo script installa Kafka su un cluster Managed Service per Apache Spark.Apri Cloud Shell ed esegui questo comando:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Effettua le seguenti sostituzioni:
- REGION:
kafka.shè archiviato in bucket pubblici con tag regionali in Cloud Storage. Specifica una regione di Compute Engine geograficamente vicina (ad esempious-central1). - BUCKET_NAME: il nome del bucket Cloud Storage.
- REGION:
Crea un cluster Managed Service per Apache Spark Kafka
Apri Cloud Shell, poi esegui il seguente comando
gcloud dataproc clusters createper creare un cluster HA Managed Service per Apache Spark che installa i componenti Kafka e ZooKeeper:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Note:
- KAFKA_CLUSTER: il nome del cluster, che deve essere univoco all'interno di un progetto. Il nome deve iniziare con una lettera minuscola e può contenere fino a 51 lettere minuscole, numeri e trattini. Non può terminare con un trattino. Il nome di un cluster eliminato può essere riutilizzato.
- PROJECT_ID: il progetto da associare a questo cluster.
- REGION: la
regione di Compute Engine
in cui si troverà il cluster, ad esempio
us-central1.- Puoi aggiungere il flag facoltativo
--zone=ZONEper specificare una zona all'interno della regione specificata, ad esempious-central1-a. Se non specifichi una zona, la funzionalità di posizionamento automatico delle zone di Managed Service for Apache Spark seleziona una zona con la regione specificata.
- Puoi aggiungere il flag facoltativo
--image-version: versione dell'immagine di Managed Service per Apache Spark2.1-debian11è consigliata per questo tutorial. Nota: ogni versione dell'immagine contiene un insieme di componenti preinstallati, incluso il componente Hive utilizzato in questo tutorial (vedi Versioni dell'immagine Managed Service for Apache Spark supportate).--num-master: i nodi master3creano un cluster HA. Il componente Zookeeper, richiesto da Kafka, è preinstallato su un cluster HA.--enable-component-gateway: attiva il gateway dei componenti di Managed Service per Apache Spark.- BUCKET_NAME: il nome del bucket Cloud Storage
che contiene lo script di inizializzazione
/scripts/kafka.sh(vedi Copiare lo script di installazione di Kafka in Cloud Storage).
Crea un argomento Kafka custdata
Per creare un argomento Kafka sul cluster Kafka di Managed Service per Apache Spark:
Utilizza l'utilità SSH per aprire una finestra del terminale sulla VM master del cluster.
Crea un argomento Kafka
custdata./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Note:
KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
-w-0:9092indica il broker Kafka in esecuzione sulla porta9092sul nodoworker-0.Dopo aver creato l'argomento
custdata, puoi eseguire i seguenti comandi:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Pubblica contenuti nell'argomento Kafka custdata
Il seguente script utilizza lo strumento kafka-console-producer.sh Kafka per
generare dati fittizi dei clienti in formato CSV.
Copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka. Premi <return> per eseguire lo script.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Esegui questo comando Kafka per verificare che l'argomento
custdatacontenga 10.000 messaggi./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Output previsto:
custdata:0:10000
Crea tabelle Hive in Cloud Storage
Crea tabelle Hive per ricevere i dati degli argomenti Kafka in streaming.
Segui questi passaggi per creare tabelle Hive cust_parquet (parquet) e
cust_orc (ORC) nel bucket Cloud Storage.
Inserisci il tuo BUCKET_NAME nel seguente script, quindi copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka, poi premi <return> per creare uno script
~/hivetables.hql(Hive Query Language).Nel passaggio successivo eseguirai lo script
~/hivetables.hqlper creare tabelle Hive parquet e ORC nel bucket Cloud Storage.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Nel terminale SSH sul nodo master del cluster Kafka, invia il job Hive
~/hivetables.hqlper creare le tabelle Hivecust_parquet(parquet) ecust_orc(ORC) nel bucket Cloud Storage.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Note:
- Il componente Hive è preinstallato nel cluster Managed Service per Apache Spark Kafka. Consulta le versioni di rilascio 2.1.x per un elenco delle versioni dei componenti Hive incluse nelle immagini 2.1 rilasciate di recente.
- KAFKA_CLUSTER: il nome del cluster Kafka.
- REGION: la regione in cui si trova il cluster Kafka.
Trasmetti in streaming Kafka custdata alle tabelle Hive
- Esegui il comando seguente nel terminale SSH sul nodo master del cluster Kafka per installare la libreria
kafka-python. È necessario un client Kafka per trasmettere i dati degli argomenti Kafka a Cloud Storage.pip install kafka-python
Inserisci il tuo BUCKET_NAME, quindi copia e incolla il seguente codice PySpark nel terminale SSH sul nodo master del cluster Kafka e premi <return> per creare un file
streamdata.py.Lo script si iscrive all'argomento Kafka
custdata, quindi trasmette i dati alle tabelle Hive in Cloud Storage. Il formato di output, che può essere Parquet o ORC, viene passato allo script come parametro.cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOFNel terminale SSH sul nodo master del cluster Kafka, esegui
spark-submitper trasmettere i dati in streaming alle tabelle Hive in Cloud Storage.Inserisci il nome del tuo KAFKA_CLUSTER e dell'output FORMAT, quindi copia e incolla il seguente codice nel terminale SSH sul nodo master del tuo cluster Kafka, quindi premi <return> per eseguire il codice e trasmettere in streaming i dati
custdatadi Kafka in formato Parquet alle tue tabelle Hive in Cloud Storage.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMATNote:
- KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
- FORMAT: specifica
parquetoorccome formato di output. Puoi eseguire il comando in successione per trasmettere in streaming entrambi i formati alle tabelle Hive: ad esempio, nella prima chiamata, specificaparquetper trasmettere in streaming l'argomento Kafkacustdataalla tabella Hive parquet; poi, nella seconda chiamata, specifica il formatoorcper trasmettere in streamingcustdataalla tabella Hive ORC.
Quando l'output standard si interrompe nel terminale SSH, il che indica che tutto il
custdataè stato trasmesso in streaming, premi <control-c> nel terminale SSH per interrompere il processo.Elenca le tabelle Hive in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Note:
- BUCKET_NAME: inserisci il nome del bucket Cloud Storage che contiene le tabelle Hive (vedi Creare tabelle Hive).
Eseguire query sui dati in streaming
Nel terminale SSH sul nodo master del cluster Kafka, esegui il seguente comando
hiveper conteggiare i messaggi Kafkacustdatatrasmessi in streaming nelle tabelle Hive in Cloud Storage.hive -e "select count(1) from TABLE_NAME"
Note:
- TABLE_NAME: specifica
cust_parquetocust_orccome nome della tabella Hive.
Snippet di output previsto:
- TABLE_NAME: specifica
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
Esegui la pulizia
Elimina il progetto
Elimina un progetto Google Cloud :
gcloud projects delete PROJECT_ID
Elimina risorse
-
Elimina il bucket:
gcloud storage buckets delete BUCKET_NAME
- Elimina il cluster Kafka:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}