Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
I checkpoint e i log write-ahead interagiscono per offrire garanzie di elaborazione per i carichi di lavoro Structured Streaming. Il checkpoint tiene traccia delle informazioni che identificano la query, incluse le informazioni di stato e i record elaborati. Quando si eliminano i file nella directory del checkpoint o si passa a un nuovo percorso del checkpoint, inizia una nuova esecuzione della query.
Una cartella del checkpoint contiene quanto segue:
- Offset: Gli offset di origine elaborati in ogni micro-batch. In questo modo la query può riprendere esattamente da dove è stata interrotta senza rielaborare i dati.
- Commit: record di cui è stato eseguito il commit di micro batch nel sink, abilitando la semantica di tipo exactly-once.
-
Stato: per le query con stato (aggregazioni, join tra flussi, deduplicazione e operatori personalizzati con stato come
transformWithState), il checkpoint archivia i metadati relativi all'operatore con stato, allo schema dello stato e al contenuto dell'archivio di stati gestito dal provider dell'archivio stati. - Metadati: ID di richiesta univoco usato per identificare la query. Le impostazioni di configurazione vengono conservate come parte del registro di offset.
Ogni query deve avere un percorso di checkpoint diverso. Più query non devono mai condividere la stessa posizione.
Nota
Questo articolo illustra i checkpoint di Structured Streaming per le query di streaming. Per informazioni sull'uso DataFrame.checkpoint() con i volumi di Unity Catalog per troncare i piani di esecuzione dei dataframe non in streaming, vedere Checkpoint del dataframe nei volumi.
Abilitare il checkpoint per le query di Structured Streaming
È necessario specificare l'checkpointLocationopzione prima di eseguire una query di streaming, come nell'esempio seguente:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Nota
Alcuni sink, ad esempio l'output per display() nei notebook e il sink memory, generano automaticamente un percorso di checkpoint temporaneo se si omette questa opzione. Questi percorsi di checkpoint temporanei non garantiscono alcuna tolleranza di errore o garanzie di coerenza dei dati e potrebbero non essere puliti correttamente. Databricks consiglia di specificare sempre una posizione del checkpoint per questi sink.
Eseguire il ripristino dopo le modifiche in una query Structured Streaming
Esistono limitazioni sulle modifiche apportate a una query di streaming tra i riavvii dalla stessa posizione del checkpoint.
Le modifiche che in genere richiedono un nuovo checkpoint includono il numero o il tipo di origini di input, i topic Kafka sottoscritti o i percorsi dell'Auto Loader, i tipi di operazione con stato, lo schema di stato e il tipo di destinazione di output.
Le modifiche generalmente sicure includono l'aggiunta o la rimozione di filtri, la modifica dei limiti di frequenza, gli intervalli di trigger e l'aggiornamento della logica di funzione definita dall'utente all'interno mapGroupsWithState (anche se la semantica può cambiare).
La sezione seguente descrive le modifiche non consentite o l'effetto della modifica non è ben definito, dove:
- Il termine consentito significa che è possibile eseguire la modifica specificata, ma se la semantica del suo effetto è ben definita o no dipende dalla query e dalla modifica.
- Il termine non consentito indica che non è consigliabile eseguire la modifica specificata perché è probabile che la query riavviata abbia esito negativo con errori imprevedibili.
-
sdfrappresenta un dataframe di streaming/set di dati generato consparkSession.readStream.
Tipi di modifiche nelle query Structured Streaming
Modifiche al numero o al tipo di origini di input: questo non è consentito per impostazione predefinita perché Structured Streaming identifica le origini in base alla posizione nel piano di query. Se si attiva la denominazione dell'origine, è possibile riordinare le origini esistenti e aggiungere nuove origini senza iniziare da un nuovo checkpoint. Vedere Modificare le origini di streaming con l'evoluzione dell'origine.
Modifiche nei parametri delle origini di input: La decisione se sia consentito e se la semantica della modifica sia ben definita dipende dall'origine e dalla query, inclusi controlli di ammissione come
maxFilesPerTriggeromaxOffsetsPerTrigger. Ecco alcuni esempi:È consentito aggiungere, eliminare e modificare i limiti di frequenza:
spark.readStream.format("kafka").option("subscribe", "article")to
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)Per informazioni dettagliate, vedere Configurare le dimensioni batch di Structured Streaming in Azure Databricks
Le modifiche apportate agli articoli e ai file sottoscritti non sono in genere consentite perché i risultati sono imprevedibili:
spark.readStream.format("kafka").option("subscribe", "article")aspark.readStream.format("kafka").option("subscribe", "newarticle")
Modifiche nell'intervallo di trigger: è possibile modificare i trigger tra batch incrementali e intervalli di tempo. Vedere Modificare gli intervalli di trigger tra le esecuzioni.
Modifiche nel tipo di sink di output: sono consentite modifiche tra alcune combinazioni specifiche di sink. Questo deve essere verificato caso per caso. Ecco alcuni esempi.
- È consentito il passaggio da sink di file a sink Kafka. Kafka vedrà solo i nuovi dati.
- La conversione da sink Kafka a sink di file non è consentita.
- La modifica del sink Kafka in foreach o viceversa è consentita.
Modifiche nei parametri del sink di output: Indica se è consentito e se la semantica della modifica è ben definita, dipende dal sink e dalla query. Ecco alcuni esempi.
- Le modifiche alla directory di output di un file sink non sono consentite:
sdf.writeStream.format("parquet").option("path", "/somePath")asdf.writeStream.format("parquet").option("path", "/anotherPath") - Le modifiche apportate all'argomento di output sono consentite:
sdf.writeStream.format("kafka").option("topic", "topic1")asdf.writeStream.format("kafka").option("topic", "topic2") - Le modifiche apportate al sink foreach definito dall'utente(ovvero il
ForeachWritercodice) sono consentite, ma la semantica della modifica dipende dal codice.
- Le modifiche alla directory di output di un file sink non sono consentite:
Modifiche apportate alle operazioni di proiezione/filtro/mapping: alcuni casi sono consentiti. Per esempio:
- L'aggiunta o l'eliminazione dei filtri è consentita:
sdf.selectExpr("a")asdf.where(...).selectExpr("a").filter(...). - Le modifiche apportate alle proiezioni con lo stesso schema di output sono consentite:
sdf.selectExpr("stringColumn AS json").writeStreamasdf.select(to_json(...).as("json")).writeStream. - Le modifiche apportate alle proiezioni con schema di output diverso sono consentite in modo condizionale:
sdf.selectExpr("a").writeStreamasdf.selectExpr("b").writeStreamè consentito solo se il sink di output consente la modifica dello schema da"a"a"b".
- L'aggiunta o l'eliminazione dei filtri è consentita:
Modifiche nelle operazioni con stato: alcune operazioni nelle query di streaming devono mantenere i dati sullo stato per aggiornare continuamente il risultato. Structured Streaming esegue automaticamente il checkpoint dei dati di stato nell'archiviazione con tolleranza agli errori (ad esempio, DBFS, archiviazione BLOB di Azure) e lo ripristina automaticamente dopo il riavvio. Tuttavia, si presuppone che lo schema dei dati di stato rimanga invariato tra i riavvii. Ciò significa che eventuali modifiche ( ovvero aggiunte, eliminazioni o modifiche dello schema) alle operazioni con stato di una query di streaming non sono consentite tra riavvii. Di seguito è riportato l'elenco delle operazioni con stato il cui schema non deve essere modificato tra i riavvii per garantire il ripristino dello stato:
-
Aggregazione di streaming: ad esempio,
sdf.groupBy("a").agg(...). Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni. -
Deduplicazione dello streaming: ad esempio,
sdf.dropDuplicates("a"). Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni. -
join tra flussi: ad esempio,
sdf1.join(sdf2, ...)(ovvero, entrambi gli input sono generati consparkSession.readStream). Le modifiche apportate allo schema o alle colonne di equi-join non sono consentite. Le modifiche apportate al tipo di join (esterno o interno) non sono consentite. Altre modifiche nella condizione di join non sono definite correttamente. -
Operazione arbitraria con stato: ad esempio,
sdf.groupByKey(...).mapGroupsWithState(...)osdf.groupByKey(...).flatMapGroupsWithState(...). Qualsiasi modifica allo schema dello stato definito dall'utente e il tipo di timeout non è consentito. Qualsiasi modifica all'interno della funzione di mapping dello stato definita dall'utente è consentita, ma l'effetto semantico della modifica dipende dalla logica definita dall'utente. Se si vuole effettivamente supportare le modifiche dello schema di stato, è possibile codificare/decodificare in modo esplicito le strutture di dati di stato complesse in byte usando uno schema di codifica/decodifica che supporta la migrazione dello schema. Ad esempio, se si salva lo stato come byte con codifica Avro, è possibile modificare lo schema avro-state-schema tra i riavvii della query perché ripristina lo stato binario.
-
Aggregazione di streaming: ad esempio,
Importante
Gli operatori a stato dropDuplicates() e dropDuplicatesWithinWatermark() possono non essere riavviati a causa di un controllo di compatibilità dello schema di stato durante la modifica delle modalità di accesso di calcolo.
È consentita la modifica tra le modalità di accesso dedicato e senza isolamento. È consentita la modifica tra le modalità di accesso standard e serverless. Non tentare di passare da altre combinazioni di modalità di accesso.
Per evitare questo errore, non modificare la modalità di accesso di calcolo per le query di streaming che contengono questi operatori.
Cambia le sorgenti di streaming con l'evoluzione della sorgente
Per impostazione predefinita, Structured Streaming identifica le origini in base alla posizione nel piano di query, ad esempio 0, 1, 2e così via. Qualsiasi modifica apportata al numero o all'ordine delle origini di input interrompe la compatibilità dei checkpoint e richiede un nuovo checkpoint. L'evoluzione dell'origine consente di assegnare nomi stabili e definiti dall'utente a ogni origine di streaming in modo da poter riordinare, aggiungere o rimuovere origini da una query senza perdere lo stato del checkpoint.
L'evoluzione dell'origine dati richiede Databricks Runtime 18.2 o versioni successive.
Configurazione richiesta
Per abilitare l'evoluzione della sorgente, imposta due configurazioni di Spark:
-
spark.sql.streaming.queryEvolution.enableSourceEvolution: quandotrue, tutte le origini di streaming nella query devono essere denominate in modo esplicito usando l'API.name(). Il valore predefinito èfalse. -
spark.sql.streaming.offsetLog.formatVersion: deve essere impostato su2per utilizzare il formato di rilevamento offset basato sul nome. Il valore predefinito è1.
Impostare entrambe le configurazioni prima di definire la query di streaming:
spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")
Regole di denominazione
- I nomi devono contenere solo caratteri alfanumerici e caratteri di sottolineatura (
[a-zA-Z0-9_]+). - Ogni nome di origine deve essere univoco all'interno di una query.
- Quando l'evoluzione dell'origine è abilitata, ogni origine di streaming deve avere un nome. Le sorgenti senza nome causano un errore
UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT.
Riordinare, aggiungere e rimuovere le fonti
Le seguenti modifiche sono sicure nei riavvii delle query con lo stesso checkpoint:
- Riordinare le origini: riavviare la query con un ordine di origini diverso. Ogni sorgente riprende dall'ultimo offset sottoposto a commit in base al proprio nome e non modifica lo stato del checkpoint.
- Aggiungere nuove origini: riavviare la query con una nuova origine. Le nuove sorgenti vengono elaborate dall'inizio e le sorgenti esistenti continuano dai loro ultimi offset.
- Rimuovi origini: riavviare la query senza l'origine. La sorgente viene rimossa in modo permanente dal checkpoint. Non è possibile aggiungere nuovamente un'origine rimossa con lo stesso nome.
Example
Usare .name() su DataStreamReader prima di chiamare .load() o .table():
Python
orders_us = (spark.readStream
.name("orders_us")
.table("catalog.schema.orders_us")
)
orders_eu = (spark.readStream
.name("orders_eu")
.table("catalog.schema.orders_eu")
)
all_orders = orders_us.union(orders_eu)
Scala
val ordersUS = spark.readStream
.name("orders_us")
.table("catalog.schema.orders_us")
val ordersEU = spark.readStream
.name("orders_eu")
.table("catalog.schema.orders_eu")
val allOrders = ordersUS.union(ordersEU)
Limitations
- Il nome della sorgente richiede una nuova verifica. Non è possibile abilitare l'evoluzione della sorgente con un checkpoint esistente che utilizza il formato del log degli offset V1.
- Dopo un aggiornamento al formato di log di offset V2, non è possibile eseguire il downgrade alla versione 1. Vedere Configurazione richiesta.
- I nomi di origine sono permanenti. Per rinominare un'origine, rimuoverla e quindi aggiungerla con un nuovo nome. Processi di origine rinominati dall'inizio.