Spark: RDD Left Outer Join Optimization for Duplicate Keys
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
THE SCENARIO
I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.
What I'm trying to achieve is simple:
- Given two RDDs:
rdd1
andrdd2
(both have the same structure:(k, v)
) - Using
rdd1
andrdd2
, generate another RDDrdd3
that has the structure:(k1, v1, List(v2..))
k1
andv1
come fromrdd1
(same values, this will lead tordd1
andrdd3
have the same length)List(v2..)
is a list whose values are coming from the values ofrdd2
- To add an
rdd2
'sv
to the list inrdd3
's tuple, itsk
(the key fromrdd2
) should match thek
fromrdd1
MY ATTEMPT
My approach was to use a left outer join. So, I came up with something like this:
rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)
This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.
AN EXAMPLE
Just in case my idea is not clear yet, I have the following example:
Given two RDDs that have the following data:
rdd1
:
key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c
rdd2
:
key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z
The resulting rdd3
should be
key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y
apache-spark join rdd
|
show 2 more comments
THE SCENARIO
I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.
What I'm trying to achieve is simple:
- Given two RDDs:
rdd1
andrdd2
(both have the same structure:(k, v)
) - Using
rdd1
andrdd2
, generate another RDDrdd3
that has the structure:(k1, v1, List(v2..))
k1
andv1
come fromrdd1
(same values, this will lead tordd1
andrdd3
have the same length)List(v2..)
is a list whose values are coming from the values ofrdd2
- To add an
rdd2
'sv
to the list inrdd3
's tuple, itsk
(the key fromrdd2
) should match thek
fromrdd1
MY ATTEMPT
My approach was to use a left outer join. So, I came up with something like this:
rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)
This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.
AN EXAMPLE
Just in case my idea is not clear yet, I have the following example:
Given two RDDs that have the following data:
rdd1
:
key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c
rdd2
:
key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z
The resulting rdd3
should be
key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y
apache-spark join rdd
1
Why not group by key before joining on these keys?
– Terry Dactyl
Nov 15 '18 at 8:37
@TerryDactyl I think grouping would slow down the program even further, am I right?
– m2008m1033m
Nov 15 '18 at 8:50
I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.
– Terry Dactyl
Nov 15 '18 at 9:04
@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.
– m2008m1033m
Nov 15 '18 at 9:10
I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.
– Terry Dactyl
Nov 15 '18 at 9:11
|
show 2 more comments
THE SCENARIO
I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.
What I'm trying to achieve is simple:
- Given two RDDs:
rdd1
andrdd2
(both have the same structure:(k, v)
) - Using
rdd1
andrdd2
, generate another RDDrdd3
that has the structure:(k1, v1, List(v2..))
k1
andv1
come fromrdd1
(same values, this will lead tordd1
andrdd3
have the same length)List(v2..)
is a list whose values are coming from the values ofrdd2
- To add an
rdd2
'sv
to the list inrdd3
's tuple, itsk
(the key fromrdd2
) should match thek
fromrdd1
MY ATTEMPT
My approach was to use a left outer join. So, I came up with something like this:
rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)
This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.
AN EXAMPLE
Just in case my idea is not clear yet, I have the following example:
Given two RDDs that have the following data:
rdd1
:
key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c
rdd2
:
key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z
The resulting rdd3
should be
key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y
apache-spark join rdd
THE SCENARIO
I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.
What I'm trying to achieve is simple:
- Given two RDDs:
rdd1
andrdd2
(both have the same structure:(k, v)
) - Using
rdd1
andrdd2
, generate another RDDrdd3
that has the structure:(k1, v1, List(v2..))
k1
andv1
come fromrdd1
(same values, this will lead tordd1
andrdd3
have the same length)List(v2..)
is a list whose values are coming from the values ofrdd2
- To add an
rdd2
'sv
to the list inrdd3
's tuple, itsk
(the key fromrdd2
) should match thek
fromrdd1
MY ATTEMPT
My approach was to use a left outer join. So, I came up with something like this:
rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)
This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.
AN EXAMPLE
Just in case my idea is not clear yet, I have the following example:
Given two RDDs that have the following data:
rdd1
:
key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c
rdd2
:
key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z
The resulting rdd3
should be
key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y
apache-spark join rdd
apache-spark join rdd
asked Nov 15 '18 at 7:55
m2008m1033mm2008m1033m
369319
369319
1
Why not group by key before joining on these keys?
– Terry Dactyl
Nov 15 '18 at 8:37
@TerryDactyl I think grouping would slow down the program even further, am I right?
– m2008m1033m
Nov 15 '18 at 8:50
I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.
– Terry Dactyl
Nov 15 '18 at 9:04
@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.
– m2008m1033m
Nov 15 '18 at 9:10
I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.
– Terry Dactyl
Nov 15 '18 at 9:11
|
show 2 more comments
1
Why not group by key before joining on these keys?
– Terry Dactyl
Nov 15 '18 at 8:37
@TerryDactyl I think grouping would slow down the program even further, am I right?
– m2008m1033m
Nov 15 '18 at 8:50
I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.
– Terry Dactyl
Nov 15 '18 at 9:04
@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.
– m2008m1033m
Nov 15 '18 at 9:10
I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.
– Terry Dactyl
Nov 15 '18 at 9:11
1
1
Why not group by key before joining on these keys?
– Terry Dactyl
Nov 15 '18 at 8:37
Why not group by key before joining on these keys?
– Terry Dactyl
Nov 15 '18 at 8:37
@TerryDactyl I think grouping would slow down the program even further, am I right?
– m2008m1033m
Nov 15 '18 at 8:50
@TerryDactyl I think grouping would slow down the program even further, am I right?
– m2008m1033m
Nov 15 '18 at 8:50
I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.
– Terry Dactyl
Nov 15 '18 at 9:04
I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.
– Terry Dactyl
Nov 15 '18 at 9:04
@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.
– m2008m1033m
Nov 15 '18 at 9:10
@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.
– m2008m1033m
Nov 15 '18 at 9:10
I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.
– Terry Dactyl
Nov 15 '18 at 9:11
I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.
– Terry Dactyl
Nov 15 '18 at 9:11
|
show 2 more comments
1 Answer
1
active
oldest
votes
First of all don't use:
map ... => (..., Array(...)) .reduceByKey(_ ++ _)
That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey
.
Additionally just to groupByKey
afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup
directly (that's how RDD joins work) and flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)
You can also use DataSet
API which tends to be more efficient in such cases
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
The result:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first
val should_keep =
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
When using Dataset
API please be sure to adjust spark.sql.shuffle.partitions
to reflect the amount of data you process.
Note:
None of that will help you if number of duplicates in rdd2
is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.
Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.
– m2008m1033m
Nov 16 '18 at 10:31
@m2008m1033m I recommend reading this.
– user10465355
Nov 16 '18 at 12:15
I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?
– m2008m1033m
Dec 18 '18 at 4:45
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53314711%2fspark-rdd-left-outer-join-optimization-for-duplicate-keys%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
First of all don't use:
map ... => (..., Array(...)) .reduceByKey(_ ++ _)
That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey
.
Additionally just to groupByKey
afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup
directly (that's how RDD joins work) and flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)
You can also use DataSet
API which tends to be more efficient in such cases
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
The result:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first
val should_keep =
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
When using Dataset
API please be sure to adjust spark.sql.shuffle.partitions
to reflect the amount of data you process.
Note:
None of that will help you if number of duplicates in rdd2
is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.
Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.
– m2008m1033m
Nov 16 '18 at 10:31
@m2008m1033m I recommend reading this.
– user10465355
Nov 16 '18 at 12:15
I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?
– m2008m1033m
Dec 18 '18 at 4:45
add a comment |
First of all don't use:
map ... => (..., Array(...)) .reduceByKey(_ ++ _)
That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey
.
Additionally just to groupByKey
afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup
directly (that's how RDD joins work) and flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)
You can also use DataSet
API which tends to be more efficient in such cases
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
The result:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first
val should_keep =
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
When using Dataset
API please be sure to adjust spark.sql.shuffle.partitions
to reflect the amount of data you process.
Note:
None of that will help you if number of duplicates in rdd2
is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.
Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.
– m2008m1033m
Nov 16 '18 at 10:31
@m2008m1033m I recommend reading this.
– user10465355
Nov 16 '18 at 12:15
I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?
– m2008m1033m
Dec 18 '18 at 4:45
add a comment |
First of all don't use:
map ... => (..., Array(...)) .reduceByKey(_ ++ _)
That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey
.
Additionally just to groupByKey
afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup
directly (that's how RDD joins work) and flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)
You can also use DataSet
API which tends to be more efficient in such cases
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
The result:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first
val should_keep =
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
When using Dataset
API please be sure to adjust spark.sql.shuffle.partitions
to reflect the amount of data you process.
Note:
None of that will help you if number of duplicates in rdd2
is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.
First of all don't use:
map ... => (..., Array(...)) .reduceByKey(_ ++ _)
That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey
.
Additionally just to groupByKey
afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup
directly (that's how RDD joins work) and flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)
You can also use DataSet
API which tends to be more efficient in such cases
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
The result:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first
val should_keep =
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
When using Dataset
API please be sure to adjust spark.sql.shuffle.partitions
to reflect the amount of data you process.
Note:
None of that will help you if number of duplicates in rdd2
is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.
answered Nov 15 '18 at 10:18
user10465355user10465355
2,1392521
2,1392521
Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.
– m2008m1033m
Nov 16 '18 at 10:31
@m2008m1033m I recommend reading this.
– user10465355
Nov 16 '18 at 12:15
I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?
– m2008m1033m
Dec 18 '18 at 4:45
add a comment |
Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.
– m2008m1033m
Nov 16 '18 at 10:31
@m2008m1033m I recommend reading this.
– user10465355
Nov 16 '18 at 12:15
I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?
– m2008m1033m
Dec 18 '18 at 4:45
Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.
– m2008m1033m
Nov 16 '18 at 10:31
Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.
– m2008m1033m
Nov 16 '18 at 10:31
@m2008m1033m I recommend reading this.
– user10465355
Nov 16 '18 at 12:15
@m2008m1033m I recommend reading this.
– user10465355
Nov 16 '18 at 12:15
I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?
– m2008m1033m
Dec 18 '18 at 4:45
I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?
– m2008m1033m
Dec 18 '18 at 4:45
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53314711%2fspark-rdd-left-outer-join-optimization-for-duplicate-keys%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
Why not group by key before joining on these keys?
– Terry Dactyl
Nov 15 '18 at 8:37
@TerryDactyl I think grouping would slow down the program even further, am I right?
– m2008m1033m
Nov 15 '18 at 8:50
I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.
– Terry Dactyl
Nov 15 '18 at 9:04
@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.
– m2008m1033m
Nov 15 '18 at 9:10
I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.
– Terry Dactyl
Nov 15 '18 at 9:11