Spark Dataframes UPSERT to Postgres Table

Multi tool use
up vote
12
down vote
favorite
I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :
myDataFrame.write.jdbc(url, table, connectionProperties)
But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.
postgresql scala apache-spark apache-spark-sql spark-dataframe
add a comment |
up vote
12
down vote
favorite
I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :
myDataFrame.write.jdbc(url, table, connectionProperties)
But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.
postgresql scala apache-spark apache-spark-sql spark-dataframe
add a comment |
up vote
12
down vote
favorite
up vote
12
down vote
favorite
I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :
myDataFrame.write.jdbc(url, table, connectionProperties)
But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.
postgresql scala apache-spark apache-spark-sql spark-dataframe
I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :
myDataFrame.write.jdbc(url, table, connectionProperties)
But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.
postgresql scala apache-spark apache-spark-sql spark-dataframe
postgresql scala apache-spark apache-spark-sql spark-dataframe
asked Jan 6 '16 at 21:33


void
1,00211737
1,00211737
add a comment |
add a comment |
4 Answers
4
active
oldest
votes
up vote
13
down vote
accepted
It is not supported. DataFrameWriter
can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.
One option is to use an action (foreach
, foreachPartition
) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.
Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
– void
Jan 7 '16 at 6:59
dataframe.write.mode(SaveMode.OverWrite)
– Aydin K.
Feb 13 at 12:43
add a comment |
up vote
9
down vote
KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach session =>
session.foreach x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
st.executeBatch()
dbc.close()
This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.
Can "Update" sql statement be used here to update the single row in database table?
– User007
Aug 16 at 19:20
Yep, the prepared statement can be an insert.
– jstuartmill
Aug 18 at 4:14
add a comment |
up vote
8
down vote
If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement =
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
The PreparedStatement
is part of java.sql
and it has methods like execute()
and executeUpdate()
. You still have to modify the sql
accordingly, of course.
add a comment |
up vote
2
down vote
To insert JDBC you can use
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.
def insertInto(tableName: String): Unit
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.
Because it inserts data to an existing table, format or options will be ignored.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Nothing yet to update individual records out of the box from spark though
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34643200%2fspark-dataframes-upsert-to-postgres-table%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
4 Answers
4
active
oldest
votes
4 Answers
4
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
13
down vote
accepted
It is not supported. DataFrameWriter
can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.
One option is to use an action (foreach
, foreachPartition
) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.
Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
– void
Jan 7 '16 at 6:59
dataframe.write.mode(SaveMode.OverWrite)
– Aydin K.
Feb 13 at 12:43
add a comment |
up vote
13
down vote
accepted
It is not supported. DataFrameWriter
can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.
One option is to use an action (foreach
, foreachPartition
) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.
Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
– void
Jan 7 '16 at 6:59
dataframe.write.mode(SaveMode.OverWrite)
– Aydin K.
Feb 13 at 12:43
add a comment |
up vote
13
down vote
accepted
up vote
13
down vote
accepted
It is not supported. DataFrameWriter
can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.
One option is to use an action (foreach
, foreachPartition
) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.
It is not supported. DataFrameWriter
can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.
One option is to use an action (foreach
, foreachPartition
) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.
answered Jan 6 '16 at 21:53
zero323
162k39467564
162k39467564
Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
– void
Jan 7 '16 at 6:59
dataframe.write.mode(SaveMode.OverWrite)
– Aydin K.
Feb 13 at 12:43
add a comment |
Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
– void
Jan 7 '16 at 6:59
dataframe.write.mode(SaveMode.OverWrite)
– Aydin K.
Feb 13 at 12:43
Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
– void
Jan 7 '16 at 6:59
Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables
– void
Jan 7 '16 at 6:59
dataframe.write.mode(SaveMode.OverWrite)
– Aydin K.
Feb 13 at 12:43
dataframe.write.mode(SaveMode.OverWrite)
– Aydin K.
Feb 13 at 12:43
add a comment |
up vote
9
down vote
KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach session =>
session.foreach x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
st.executeBatch()
dbc.close()
This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.
Can "Update" sql statement be used here to update the single row in database table?
– User007
Aug 16 at 19:20
Yep, the prepared statement can be an insert.
– jstuartmill
Aug 18 at 4:14
add a comment |
up vote
9
down vote
KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach session =>
session.foreach x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
st.executeBatch()
dbc.close()
This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.
Can "Update" sql statement be used here to update the single row in database table?
– User007
Aug 16 at 19:20
Yep, the prepared statement can be an insert.
– jstuartmill
Aug 18 at 4:14
add a comment |
up vote
9
down vote
up vote
9
down vote
KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach session =>
session.foreach x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
st.executeBatch()
dbc.close()
This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.
KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach session =>
session.foreach x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
st.executeBatch()
dbc.close()
This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.
edited Jul 23 '17 at 22:12
answered Oct 6 '16 at 4:25
jstuartmill
21858
21858
Can "Update" sql statement be used here to update the single row in database table?
– User007
Aug 16 at 19:20
Yep, the prepared statement can be an insert.
– jstuartmill
Aug 18 at 4:14
add a comment |
Can "Update" sql statement be used here to update the single row in database table?
– User007
Aug 16 at 19:20
Yep, the prepared statement can be an insert.
– jstuartmill
Aug 18 at 4:14
Can "Update" sql statement be used here to update the single row in database table?
– User007
Aug 16 at 19:20
Can "Update" sql statement be used here to update the single row in database table?
– User007
Aug 16 at 19:20
Yep, the prepared statement can be an insert.
– jstuartmill
Aug 18 at 4:14
Yep, the prepared statement can be an insert.
– jstuartmill
Aug 18 at 4:14
add a comment |
up vote
8
down vote
If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement =
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
The PreparedStatement
is part of java.sql
and it has methods like execute()
and executeUpdate()
. You still have to modify the sql
accordingly, of course.
add a comment |
up vote
8
down vote
If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement =
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
The PreparedStatement
is part of java.sql
and it has methods like execute()
and executeUpdate()
. You still have to modify the sql
accordingly, of course.
add a comment |
up vote
8
down vote
up vote
8
down vote
If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement =
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
The PreparedStatement
is part of java.sql
and it has methods like execute()
and executeUpdate()
. You still have to modify the sql
accordingly, of course.
If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement =
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
The PreparedStatement
is part of java.sql
and it has methods like execute()
and executeUpdate()
. You still have to modify the sql
accordingly, of course.
answered Jan 6 '16 at 22:10


KrisP
99157
99157
add a comment |
add a comment |
up vote
2
down vote
To insert JDBC you can use
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.
def insertInto(tableName: String): Unit
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.
Because it inserts data to an existing table, format or options will be ignored.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Nothing yet to update individual records out of the box from spark though
add a comment |
up vote
2
down vote
To insert JDBC you can use
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.
def insertInto(tableName: String): Unit
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.
Because it inserts data to an existing table, format or options will be ignored.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Nothing yet to update individual records out of the box from spark though
add a comment |
up vote
2
down vote
up vote
2
down vote
To insert JDBC you can use
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.
def insertInto(tableName: String): Unit
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.
Because it inserts data to an existing table, format or options will be ignored.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Nothing yet to update individual records out of the box from spark though
To insert JDBC you can use
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.
def insertInto(tableName: String): Unit
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.
Because it inserts data to an existing table, format or options will be ignored.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Nothing yet to update individual records out of the box from spark though
answered Mar 4 '16 at 21:08
Soumitra
3611517
3611517
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34643200%2fspark-dataframes-upsert-to-postgres-table%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
HCNLZ,mmX3ZVWsTS