Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
Umgebungsversionen für SDP befinden sich in der Betaversion.
Pipelines mit einer environment-Version legen fest, Python Code über Spark Connect auszuführen. Diese Seite behandelt, was inkompatibel ist, was sich anders verhält, wie eine Pipeline nach betroffenen Mustern durchsucht wird und wie eine vorhandene Pipeline migriert wird.
Einschränkungen
Umgebungsversionen sind noch nicht mit allen Pipelinefunktionen kompatibel. Eine Pipelineausführung mit einem Umgebungsversionssatz schlägt fehl, wenn der Python Code der Pipeline einen der folgenden Aktionen ausführt:
- Mutiert den Spark-Sitzungszustand innerhalb einer Funktion, die mit einem Pipeline-Dekorateur versehen ist. Beispiele sind :
spark.conf.set(...),spark.sql("USE CATALOG ...")undcreateOrReplaceTempView. - Verwendet PySpark-APIs, die in Spark Connect nicht verfügbar sind, einschließlich
SparkContext,RDD,SQLContextund alle Py4J-APIs. Erfahren Sie , was in Spark Connect unterstützt wird.
Wenn die Aktivierung einer Umgebungsversion für eine Pipeline zu einem Fehler führt, gibt das Deaktivieren der Umgebungsversion die Pipeline in den vorherigen Zustand zurück.
Verhaltensänderungen
Spark Connect weist eine kleine Anzahl von Verhaltensunterschieden von der klassischen PySpark-Laufzeit auf. Die vollständige Referenz finden Sie unter Spark Connect im Vergleich zum klassischen Spark . Die Kompatibilitätsüberprüfung erkennt diese Muster vorab und blockiert die Aktivierung, bis sie behoben werden, sodass Sie sie finden und beheben können, bevor sie sich auf Produktionsdaten auswirken.
In einer Pipeline sind die häufigsten Situationen, in denen sich das Verhalten unterscheiden kann:
- Interleaved DataFrame-Konstruktion und Sitzungsmutation
- UDFs, die auf änderbare Python Zustand verweisen
Interleaved DataFrame-Konstruktion und Sitzungsmutation
Wenn eine Pipeline einen DataFrame erstellt, ändert sich der Spark-Sitzungszustand (z. B. ändert den Standardkatalog oder das Standardschema, legt eine Konfiguration fest, ersetzt eine temporäre Ansicht oder registriert eine UDF erneut), und verwendet dann den DataFrame:
- Ohne Eine Umgebungsversion verwendet DataFrame den Sitzungszustand vor der Mutation .
- Mit einer Umgebungsversion verwendet DataFrame den Sitzungszustand nach der Mutation .
Beispiel:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Enthält ohne eine Umgebungsversion mytable[(1, "Original Row")]. Enthält eine Umgebungsversion mytable[(2, "Replaced Row")].
UDFs, die auf änderbare Python Zustand verweisen
Wenn eine UDF auf eine Python globale Variable verweist, deren Wert sich ändert, nachdem die UDF definiert wurde:
- Ohne Eine Umgebungsversion verwendet die UDF den neuesten Wert der Variablen.
- Bei einer Umgebungsversion verwendet die UDF den Wert zum Zeitpunkt der Definition der UDF.
Beispiel:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Enthält ohne eine Umgebungsversion my_mv[("alex_b",)]. Enthält eine Umgebungsversion my_mv[("alex_a",)].
Wenn eine Pipeline von beiden Mustern abhängig ist, überwachen Sie sie, bevor Sie eine Umgebungsversion aktivieren.
Kompatibilitätsscan
Die Kompatibilitätsüberprüfung hilft Ihnen beim Auffinden von Codemustern in Ihrer Pipeline, die unterschiedliche Ergebnisse unter einer Umgebungsversion erzeugen würden, bevor Sie sie aktivieren. Der Scan ist opt-in. Wenn der Scan in einer Pipeline aktiviert ist:
- Jede Pipelineausführung gibt ein
BehaviorChangeInSparkConnectWARNEreignis im Pipelineereignisprotokoll pro erkannten Muster aus. - Sie können eine Umgebungsversion in der Pipeline erst aktivieren, wenn Sie alle Kompatibilitätswarnungen aus dem vorherigen erfolgreichen Update beheben.
Wenn der Scan nicht aktiviert ist, werden keine Ereignisse ausgegeben und environment_version die Aktivierung wird nicht blockiert. Databricks empfiehlt, die Überprüfung und Auflösung von erkannten Mustern zu aktivieren, bevor eine Umgebungsversion in der Pipeline aktiviert wird.
Aktivieren des Scans in einer Pipeline
Sie können die Kompatibilitätsüberprüfung aktivieren, indem Sie die pipelines.environmentVersion.enableCompatibilityScan Pipelinekonfiguration hinzufügen. Sie können die Konfiguration über die Pipeline-Editor-Benutzeroberfläche hinzufügen oder einen Eintrag zu der Pipelinekonfigurations-JSON hinzufügen.
Über die Benutzeroberfläche:
- Klicken Sie im Pipeline-Editor auf "Einstellungen".
- Suchen Sie den Abschnitt "Konfiguration" in den Pipelineeinstellungen.
- Klicken Sie auf
Konfiguration hinzufügen.
- Geben Sie
pipelines.environmentVersion.enableCompatibilityScanals Schlüssel undtrueals Wert ein. - Speichern Sie die Pipelineeinstellungen.
In der Pipeline JSON:
Fügen Sie dem Block den folgenden Eintrag hinzu configuration :
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Empfohlener Workflow
- Aktivieren Sie den Scan in der Pipeline.
- Auslösen einer Pipelineausführung
-
Abfragen des Pipelineereignisprotokolls für
BehaviorChangeInSparkConnectWARNEreignisse. Eine vollständige Liste der Problemcodes, Beispielmuster und vorgeschlagene Korrekturen finden Sie in der Referenz zu Kompatibilitätsereignissen . - Aktualisieren Sie den Pipelinecode, um die erkannten Muster zu entfernen und die Pipeline erneut auszuführen, bis keine weiteren Ereignisse ausgegeben werden.
- Fügen Sie
environment_versionder Pipeline eine der Methoden in "Aktivieren einer Umgebungsversion in einer Pipeline" hinzu.
Wenn Sie glauben, dass eine Kompatibilitätswarnung falsch positiv ist und trotzdem aktiviert environment_version werden soll, entfernen Sie den pipelines.environmentVersion.enableCompatibilityScan Eintrag aus der Pipelinekonfiguration, um die Überprüfung zu umgehen. (Das Festlegen des Werts auf false "Nicht zulässig" – Sie müssen den Eintrag vollständig entfernen.)
Die Preflight-Überprüfung wird nicht für Pipelines ausgeführt, die kein vorheriges Update aufweisen, oder für Pipelines, die bereits eine Umgebungsversion festgelegt haben.
Migrieren einer vorhandenen Pipeline zu Umgebungsversionen
Um eine vorhandene Pipeline zu migrieren, die noch keine Umgebungsversion verwendet, folgen Sie diesem End-to-End-Workflow. Es führt Sie durch die Suche nach Codemustern, die sich unter Spark Connect möglicherweise anders verhalten, sie reparieren und die Umgebungsversion sicher bereitstellen.
Aktivieren Sie den Kompatibilitätsscan für die Pipeline. Aktivieren Sie den Scan in der Pipeline, wie in der Kompatibilitätsüberprüfung beschrieben. Dies bewirkt, dass erkannte Muster im Ereignisprotokoll angezeigt werden und was die Preflight-Überprüfung ermöglicht, die den Aktivierungsversuch schützt.
Auslösen einer Pipelineausführung und Überprüfen von Kompatibilitätsereignissen. Auslösen eines normalen Pipelineupdates. Nachdem es erfolgreich abgeschlossen wurde, fragen Sie das Pipelineereignisprotokoll nach
BehaviorChangeInSparkConnectWARNEreignissen ab. Jedes Ereignis meldet ein erkanntes Muster. Eine vollständige Liste der Problemcodes, Beispielmuster und vorgeschlagene Korrekturen finden Sie in der Referenz zu Kompatibilitätsereignissen .Aktualisieren Sie Den Pipelinecode, um erkannte Muster zu beheben. Aktualisieren Sie für jedes erkannte Muster den Pipelinecode nach dem vorgeschlagenen Fix. Lösen Sie nach jeder Änderung ein weiteres Pipelineupdate aus, und überprüfen Sie, ob die entsprechenden Ereignisse nicht mehr angezeigt werden. Wiederholen Sie diesen Vorgang, bis das Ereignisprotokoll keine Kompatibilitätsereignisse mehr für ein erfolgreiches Update enthält.
Aktivieren Sie die Umgebungsversion in der Pipeline. Nachdem das neueste erfolgreiche Update keine Kompatibilitätsereignisse aufweist, fügen Sie
environment_versionder Pipeline mithilfe der Benutzeroberfläche, API oder des Bundles hinzu, wie unter "Aktivieren einer Umgebungsversion für eine Pipeline" beschrieben. Das nächste Update wird mit Spark Connect und der angehefteten Python Sprachversion und vorinstallierten Bibliotheken ausgeführt.Wenn das Update fehlschlägt, da noch Kompatibilitätswarnungen vorhanden sind, legen Sie die
environment_versionWarnungen ab, kehren Sie zu Schritt 2 zurück, und beheben Sie die verbleibenden Warnungen, bevor Sie es erneut versuchen.Überprüfen Sie die Migration. Überprüfen Sie nach Abschluss des ersten Updates mit der Umgebungsversion Folgendes:
- Das
create_updateEreignis im Ereignisprotokoll zeigtenvironment_versionden erwarteten Wert an. - Die Pipeline erzeugt die erwarteten Daten, und es werden keine neuen Fehlerereignisse angezeigt.
- Nachgeschaltete Spotüberprüfungstabellen für subtile Verhaltensunterschiede, die in Verhaltensänderungen beschrieben werden.
- Das
Rollback
Wenn die Pipeline nach der Migration falsch funktioniert, entfernen Sie die environment_version Aus den Pipelineeinstellungen. Das nächste Update wird mit der vorherigen Python Laufzeitkonfiguration ausgeführt. Verwenden Sie die Rollback-Ausführung zum Debuggen, und wiederholen Sie dann die Migration aus Schritt 2, nachdem Sie das Problem identifiziert und behoben haben.
Referenz zu Kompatibilitätsereignissen
Wenn die Kompatibilitätsüberprüfung für eine Pipeline aktiviert ist, gibt SDP ein BehaviorChangeInSparkConnectWARN Ereignis im Pipelineereignisprotokoll pro erkanntem Muster aus. Wenn der Scan aktiviert ist und das vorherige erfolgreiche Update Muster erkannt hat, blockiert environment_version SDP auch die Aktivierung, bis die Muster behoben werden.
Jedes Ereignis meldet einen einzelnen Problemcode, der angibt, was erkannt wurde. Um einen Code nachzuschlagen, suchen Sie ihn in der Tabelle "Problemcodes ", wobei jede Zeile mit dem Kategorieabschnitt verknüpft ist, der ein Beispielmuster und die vorgeschlagene Korrektur enthält.
Ereignis-Shape
BehaviorChangeInSparkConnect Ereignisse folgen dem Standardmäßigen Pipelineereignisprotokollschema:
-
event_typeistbehavior_change_in_spark_connect. -
levelistWARN. -
detailsenthält dasbehavior_change_in_spark_connectObjekt, das über ein einzelnesissueFeld verfügt. Der Ausgabewert ist einer der unten aufgeführten Codes. -
messageist eine lesbare Beschreibung des erkannten Musters.
Problemcodes
| Kategorie | Problemcode | Description |
|---|---|---|
| Datenbank- und Katalogmutationen | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Der Standardkatalog wurde geändert, nachdem ein DataFrame erstellt wurde. Der vorhandene DataFrame kann Tabellen mithilfe des neuen Standardkatalogs auflösen. |
| Datenbank- und Katalogmutationen | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG wurde außerhalb einer Funktion aufgerufen, die von einem Rohrleitungsdekoror verziert wurde. Der Standardkatalog kann sich bei nachfolgenden Vorgängen unerwartet ändern. |
| Datenbank- und Katalogmutationen | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Die Standarddatenbank wurde geändert, nachdem ein DataFrame erstellt wurde. Der vorhandene DataFrame kann Tabellen mithilfe der neuen Standarddatenbank auflösen. |
| Datenbank- und Katalogmutationen | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE wurde außerhalb einer Funktion aufgerufen, die von einem Rohrleitungsdekoror verziert wurde. Die Standarddatenbank kann sich bei nachfolgenden Vorgängen unerwartet ändern. |
| Eiferige Ausführung innerhalb von Flussfunktionen | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion ruft einen Prüfpunktbefehl auf. |
| Eiferige Ausführung innerhalb von Flussfunktionen | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion erstellt eine DataFrame-Ansicht (createOrReplaceTempView oder ähnlich). |
| Eiferige Ausführung innerhalb von Flussfunktionen | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion erstellt ein Ressourcenprofil. |
| Eiferige Ausführung innerhalb von Flussfunktionen | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Ablauffunktion ruft oder spark.resources eine zugehörige Ressourcen-API auf. |
| Eiferige Ausführung innerhalb von Flussfunktionen | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion führt eine zieltabelle aus MERGE INTO . |
| Eiferige Ausführung innerhalb von Flussfunktionen | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion führt einen eifrigen Spark ML-Vorgang aus. |
| Eiferige Ausführung innerhalb von Flussfunktionen | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion registriert eine Python Datenquelle. |
| Eiferige Ausführung innerhalb von Flussfunktionen | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion arbeitet mit einem aktiven Streamingabfragehandle. |
| Eiferige Ausführung innerhalb von Flussfunktionen | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Ablauffunktion registriert oder entfernt einen Streamingabfragelistener. |
| Eiferige Ausführung innerhalb von Flussfunktionen | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Ablauffunktion ruft zum Verwalten von Streamingabfragen auf spark.streams . |
| Eiferige Ausführung innerhalb von Flussfunktionen | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion führt einen eifrigen DataFrameWriterV2 Vorgang aus. |
| Eiferige Ausführung innerhalb von Flussfunktionen | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion führt einen eifrigen DataFrame.write Vorgang aus. |
| Eiferige Ausführung innerhalb von Flussfunktionen | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
Die Flussfunktion startet eine Streamingabfrage (writeStream.start()). |
| Spark-Konfigurationsmutationen | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() oder spark.conf.unset() wurde in einer Funktion aufgerufen, die von einem Pipelines-Dekorateur verziert wurde. Dies wird mit einer Umgebungsversion nicht unterstützt. |
| Spark-Konfigurationsmutationen | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() wurde außerhalb einer Funktion aufgerufen, die von einem Pipelines-Dekorateur nach der Erstellung eines DataFrames eingerichtet wurde. Die Konfigurationsänderung kann sich auf den vorhandenen DataFrame zur Ausführungszeit auswirken. |
| Spark-Konfigurationsmutationen | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() wurde außerhalb einer Funktion aufgerufen, die von einem Pipelines-Dekorateur nach der Erstellung eines DataFrames eingerichtet wurde. Die Konfigurationsänderung kann sich auf den vorhandenen DataFrame zur Ausführungszeit auswirken. |
| Temporäre Ansichtsersetzungen | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Eine globale temporäre Ansicht wurde ersetzt, nachdem ein DataFrame darauf verweist. Der Ersatz kann im vorhandenen DataFrame widerspiegelt werden. |
| Temporäre Ansichtsersetzungen | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Eine temporäre Ansicht wurde ersetzt, nachdem ein DataFrame darauf verweist. Der Ersatz kann im vorhandenen DataFrame widerspiegelt werden. |
| UDF- und UDTF-Mutationen | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Eine UDF wurde mit demselben Namen erneut registriert, nachdem ein DataFrame darauf verweist, dass es erstellt wurde. Der vorhandene DataFrame kann die neue UDF-Definition verwenden. |
| UDF- und UDTF-Mutationen | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Ein UDTF wurde mit demselben Namen erneut registriert, nachdem ein DataFrame darauf verweist, dass es erstellt wurde. Der vorhandene DataFrame kann die neue UDTF-Definition verwenden. |
| UDF- und UDTF-Mutationen | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Eine UDF verweist auf eine globale veränderbare Python Variable. Bei einer Umgebungsversion verwendet die UDF den Wert der Variablen zum Zeitpunkt der Definition der UDF, nicht zum Aufrufzeitpunkt. |
| UDF- und UDTF-Mutationen | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Ein UDTF verweist auf eine globale veränderbare Python Variable. Bei einer Umgebungsversion verwendet udTF den Wert der Variablen zum Zeitpunkt der DEFINITION des UDTF, nicht zum Aufrufzeitpunkt. |
Datenbank- und Katalogmutationen
Diese Probleme werden ausgegeben, wenn Pipelinecode die Standarddatenbank oder den Standardkatalog stummschaltet. Mit einer Umgebungsversion erstellt DataFrames, bevor die Mutation Tabellen mithilfe der neuen Datenbank oder des neuen Katalogs auflösen kann.
Beispielmuster, das ein Ereignis auslöst:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Ohne Eine Umgebungsversion dfevents wird aus dem marketing Katalog aufgelöst. Bei einer Umgebungsversion df wird events der sales Katalog aufgelöst.
Vorgeschlagene Lösung: Vollqualifizierte Tabellennamen, sodass die Auflösung nicht vom Standardkatalog oder der Standarddatenbank abhängt, und vermeiden Sie das Ändern des Standardkatalogs oder der Datenbank zwischen der Erstellung und Verwendung von DataFrame.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Spark-Konfigurationsmutationen
Diese Probleme werden ausgegeben, wenn der Pipelinecode spark-Konfiguration so stummschaltet, dass das DataFrame-Verhalten unter einer Umgebungsversion geändert werden kann.
Beispielmuster, das ein Ereignis auslöst:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Ohne Eine Umgebungsversion verwendet die Umwandlung den Konf-Wert zur Erstellungszeit von DataFrame. Bei einer Umgebungsversion wird die Umwandlung verwendet spark.sql.ansi.enabled=true und kann bei ungültiger Eingabe fehlschlagen.
Vorgeschlagene Lösung: Legen Sie alle erforderlichen Spark-Konfigurationen oben in der Pipelinedatei fest, bevor DataFrame erstellt wird. Verwenden Sie für die Konfiguration pro Abfrage die Einstellung der Pipeline configuration in der Pipelinespezifikation.
Temporäre Ansichtsersetzungen
Diese Probleme werden ausgegeben, wenn Pipelinecode eine temporäre Ansicht ersetzt, nachdem ein DataFrame darauf verweist, dass er erstellt wurde. Mit einer Umgebungsversion kann der vorhandene DataFrame den neuen Ansichtsinhalt widerspiegeln.
Beispielmuster, das ein Ereignis auslöst:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Enthält ohne eine Umgebungsversion mytable[(1, "Original Row")]. Enthält eine Umgebungsversion mytable[(2, "Replaced Row")].
Vorgeschlagene Lösung: Erstellen Sie jede temporäre Ansicht einzeln, und ersetzen Sie sie nicht. Wenn Sie mehrere Ansichten mit verwandten Daten benötigen, geben Sie jedem einen eindeutigen Namen.
UDF- und UDTF-Mutationen
Diese Probleme werden ausgegeben, wenn Pipelinecode eine UDF- oder UDTF-Datei so stummschaltet, dass sich das Verhalten unter einer Umgebungsversion ändert.
Beispielmuster, das ein Ereignis auslöst:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Enthält ohne eine Umgebungsversion my_mv[("alex_b",)]. Enthält eine Umgebungsversion my_mv[("alex_a",)].
Suggested fix: Übergeben Sie Werte als Argumente an die UDF, anstatt sie aus Python Globalen zu erfassen, oder legen Sie die Globale fest, bevor Sie die UDF definieren und sie danach nicht stummschalten.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Eiferige Ausführung innerhalb von Flussfunktionen
Diese Probleme werden ausgegeben, wenn pipelinecode einen eifrigen Spark-Befehl in einer Funktion ausführt, die von einem Pipelinedekoror (@table, @materialized_viewusw.) eingerichtet wurde. Flussfunktionen werden erwartet, dass ein DataFrame definiert und zurückgegeben wird; Eifrige Befehle, die Daten schreiben, Streamingabfragen verwalten, Ressourcen registrieren oder ML-Vorgänge ausführen, sind in einer Flussfunktion mit einem Umgebungsversionssatz nicht zulässig.
Vorgeschlagene Lösung: Verschieben Sie den eifrigen Vorgang außerhalb der Flussfunktion, und geben Sie stattdessen einen DataFrame aus der Flussfunktion zurück. Nebeneffekte wie das Schreiben in eine Tabelle oder das Starten einer Streamingabfrage gehören außerhalb der Pipelinedefinition; das Pipelinemodul verarbeitet die Materialisierung des DataFrames, der von der Flussfunktion zurückgegeben wird.
Suchen nach Kompatibilitätsereignissen im Ereignisprotokoll
Die folgende Abfrage gibt alle Kompatibilitätsereignisse für eine Pipeline zurück, sortiert zuerst:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
So zählen Sie Ereignisse nach Problemcode in den letzten Updates:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Informationen zum Abfragen des Ereignisprotokolls finden Sie unter Abfragen des Ereignisprotokolls.
Siehe auch
- Konfigurieren von Umgebungsversionen für Pipelines – Funktionsübersicht, Aktivieren einer Umgebungsversion.
- Pipelineereignisprotokollschema – vollständiges Pipelineereignisprotokollschema.
- Pipelineereignisprotokoll – So wird's gefragt: Abfragen des Pipelineereignisprotokolls.