Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
È possibile usare le operazioni dataframe o le funzioni con valori di tabella SQL per eseguire query sui dati e sui metadati dello stato structured streaming. Usare queste funzioni per osservare le informazioni sullo stato per le query con stato Structured Streaming, che possono essere utili per il monitoraggio e il debug.
È necessario avere accesso in lettura al percorso del checkpoint per una query in streaming al fine di eseguire query sui dati di stato o sui metadati. Le funzioni descritte in questo articolo forniscono l'accesso in sola lettura ai dati e ai metadati di stato. È possibile usare solo la semantica di lettura batch per eseguire query sulle informazioni sullo stato.
Nota
Non è possibile eseguire query sulle informazioni sullo stato per le pipeline dichiarative di Lakeflow Spark, le tabelle di streaming o le viste materializzate. Non è possibile eseguire query sulle informazioni di stato utilizzando l'elaborazione serverless o l'elaborazione configurata con la modalità di accesso standard.
Requisiti
- Usare una delle configurazioni di calcolo seguenti:
- Databricks Runtime 16.3 e versioni successive nel calcolo configurato con la modalità di accesso standard.
- Databricks Runtime 14.3 LTS e versioni successive nel calcolo configurato con modalità di accesso dedicato o senza isolamento.
- Accesso in lettura al percorso del checkpoint usato dalla query di streaming.
Leggere l'archivio di stato di Structured Streaming
È possibile leggere le informazioni sull'archivio di stato per le query Structured streaming eseguite in qualsiasi runtime di Databricks supportato. Usare la sintassi seguente:
Pitone
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Opzioni dell'API lettore di stato e schema
Per un elenco completo delle opzioni di formato statestore, vedi Archivio di stato.
I dati di output hanno lo schema seguente:
| Colonna | Tipo | Descrizione |
|---|---|---|
key |
Struttura (ulteriore tipo derivato dalla chiave di stato) | La chiave per un record di un operatore con stato nel punto di controllo dello stato. |
value |
Struttura (ulteriore tipo derivato dal valore di stato) | Il valore di un record dell'operatore con stato nel checkpoint di stato. |
partition_id |
Intero | Partizione del checkpoint di stato che contiene il record dell'operatore con stato. |
In Databricks Runtime 16.4 LTS e versioni successive, quando l'opzione readChangeFeed è impostata su true, i dati di output hanno lo schema seguente:
| Colonna | Tipo | Descrizione |
|---|---|---|
batch_id |
Lungo | ID batch a cui appartiene la modifica dello stato. |
change_type |
Stringa | Tipo di modifica applicata dal batch: update per inserimenti e aggiornamenti, delete per le eliminazioni. |
key |
Struttura (ulteriore tipo derivato dalla chiave di stato) | La chiave per un record di un operatore con stato nel punto di controllo dello stato. |
value |
Struttura (ulteriore tipo derivato dal valore di stato) | Il valore di un record dell'operatore con stato nel checkpoint di stato.
null per i record in cui change_type è delete. |
partition_id |
Intero | Partizione del checkpoint di stato che contiene il record dell'operatore con stato. |
Vedere read_statestore funzione con valori di tabella.
Leggere le modifiche dello stato di Structured Streaming
Disponibile in Databricks Runtime 16.4 LTS e versioni successive. Per leggere il modo in cui lo stato cambia tra microbatches invece di visualizzare lo stato completo in un singolo microbatch, impostare readChangeFeed su true e specificare changeStartBatchId. Facoltativamente, specificare changeEndBatchId. Per un elenco completo delle opzioni, consulta State store.
Ad esempio, per leggere le modifiche dello stato dal batch 2 tramite il batch di cui è stato eseguito il commit più recente:
Pitone
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
Lo schema di output include colonne aggiuntive batch_id e change_type. Per lo schema completo, vedere Opzioni e schema dell'API di lettura dello stato.
Leggere i metadati di stato di Streaming strutturato
Disponibile in Databricks Runtime 14.3 LTS o versione successiva. È possibile leggere le informazioni sui metadati di stato per le query structured streaming:
Pitone
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
I dati restituiti hanno lo schema seguente:
| Colonna | Tipo | Descrizione |
|---|---|---|
operatorId |
Intero | Numero ID intero dell'operatore stateful di streaming. |
operatorName |
Stringa | Nome dell'operatore di streaming con stato. |
stateStoreName |
Stringa | Nome dell'archivio di stato dell'operatore. |
numPartitions |
Intero | Numero di partizioni dell'archivio di stato. |
minBatchId |
Lungo | ID batch minimo disponibile per l'interrogazione dello stato. |
maxBatchId |
Lungo | L'ID batch massimo disponibile per l'interrogazione dello stato. |
Nota
I valori dell'ID batch forniti da minBatchId e maxBatchId riflettono lo stato al momento della scrittura del checkpoint. I batch precedenti vengono puliti automaticamente con l'esecuzione di micro batch, quindi il valore fornito qui non è garantito che sia ancora disponibile.
Vedere read_state_metadata funzione con valori di tabella.
Esempio: Eseguire una query su un lato di un join tra flussi
Usare la sintassi seguente per eseguire una query sul lato sinistro di un join tra flussi.
Pitone
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Esempio: Eseguire query nell'archivio di stato per il flusso con più operatori con stato
Questo esempio usa il lettore di metadati di stato per raccogliere i dettagli dei metadati di una query di streaming con più operatori con stato, quindi usa i risultati dei metadati come opzioni per il lettore di stato.
Il lettore di metadati di stato accetta il percorso del checkpoint come unica opzione, come nell'esempio di sintassi seguente:
Pitone
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
La tabella seguente rappresenta un esempio di output dei metadati dell'archivio di stato.
| operatorId | operatorName | statoNomeNegozio | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSalva | impostazione predefinita | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | impostazione predefinita | 200 | 0 | 13 |
Per ottenere i risultati per l'operatore dedupeWithinWatermark , eseguire una query sul lettore di stato con l'opzione operatorId , come nell'esempio seguente:
Pitone
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);