SparkSQL subquery and performance
In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:
Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")
The number of these dictionaries is only limited by the user imagination and business needs.
After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE
conditions:
Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'
The number of these queries is only limited by the user imagination and business needs.
My biggest worry right now is the subquery like country IN (FROM medium_countries)
I can't use explicit JOIN
here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?
UPDATED
Right now I can test it only in Spark Local Mode
Query:
country IN (FROM big_countries)
Execution plan:
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
Query:
TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
Execution plan:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
apache-spark apache-spark-sql
add a comment |
In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:
Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")
The number of these dictionaries is only limited by the user imagination and business needs.
After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE
conditions:
Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'
The number of these queries is only limited by the user imagination and business needs.
My biggest worry right now is the subquery like country IN (FROM medium_countries)
I can't use explicit JOIN
here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?
UPDATED
Right now I can test it only in Spark Local Mode
Query:
country IN (FROM big_countries)
Execution plan:
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
Query:
TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
Execution plan:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
apache-spark apache-spark-sql
Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36
Sure, updated the question
– alexanoid
Nov 11 at 8:44
add a comment |
In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:
Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")
The number of these dictionaries is only limited by the user imagination and business needs.
After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE
conditions:
Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'
The number of these queries is only limited by the user imagination and business needs.
My biggest worry right now is the subquery like country IN (FROM medium_countries)
I can't use explicit JOIN
here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?
UPDATED
Right now I can test it only in Spark Local Mode
Query:
country IN (FROM big_countries)
Execution plan:
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
Query:
TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
Execution plan:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
apache-spark apache-spark-sql
In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:
Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")
The number of these dictionaries is only limited by the user imagination and business needs.
After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE
conditions:
Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'
The number of these queries is only limited by the user imagination and business needs.
My biggest worry right now is the subquery like country IN (FROM medium_countries)
I can't use explicit JOIN
here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?
UPDATED
Right now I can test it only in Spark Local Mode
Query:
country IN (FROM big_countries)
Execution plan:
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
Query:
TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
Execution plan:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
apache-spark apache-spark-sql
apache-spark apache-spark-sql
edited Nov 11 at 12:26
asked Nov 11 at 7:50
alexanoid
7,0821178173
7,0821178173
Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36
Sure, updated the question
– alexanoid
Nov 11 at 8:44
add a comment |
Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36
Sure, updated the question
– alexanoid
Nov 11 at 8:44
Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36
Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36
Sure, updated the question
– alexanoid
Nov 11 at 8:44
Sure, updated the question
– alexanoid
Nov 11 at 8:44
add a comment |
1 Answer
1
active
oldest
votes
I think that:
CACHE TABLE tbl as in sql("CACHE TABLE tbl")
is what you need to be executed after your:
...createOrReplaceTempView....
but before the larger queries of course.
In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.
Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.
Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.
You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.
As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53246811%2fsparksql-subquery-and-performance%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I think that:
CACHE TABLE tbl as in sql("CACHE TABLE tbl")
is what you need to be executed after your:
...createOrReplaceTempView....
but before the larger queries of course.
In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.
Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.
Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.
You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.
As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.
add a comment |
I think that:
CACHE TABLE tbl as in sql("CACHE TABLE tbl")
is what you need to be executed after your:
...createOrReplaceTempView....
but before the larger queries of course.
In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.
Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.
Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.
You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.
As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.
add a comment |
I think that:
CACHE TABLE tbl as in sql("CACHE TABLE tbl")
is what you need to be executed after your:
...createOrReplaceTempView....
but before the larger queries of course.
In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.
Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.
Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.
You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.
As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.
I think that:
CACHE TABLE tbl as in sql("CACHE TABLE tbl")
is what you need to be executed after your:
...createOrReplaceTempView....
but before the larger queries of course.
In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.
Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.
Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.
You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.
As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.
answered Nov 12 at 18:43
thebluephantom
2,3552925
2,3552925
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53246811%2fsparksql-subquery-and-performance%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36
Sure, updated the question
– alexanoid
Nov 11 at 8:44