Spark: RDD Left Outer Join Optimization for Duplicate Keys



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








1















THE SCENARIO



I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.



What I'm trying to achieve is simple:



  • Given two RDDs: rdd1 and rdd2 (both have the same structure: (k, v))

  • Using rdd1 and rdd2, generate another RDD rdd3 that has the structure: (k1, v1, List(v2..))


  • k1 and v1 come from rdd1 (same values, this will lead to rdd1 and rdd3 have the same length)


  • List(v2..) is a list whose values are coming from the values of rdd2

  • To add an rdd2's v to the list in rdd3's tuple, its k (the key from rdd2) should match the k from rdd1

MY ATTEMPT



My approach was to use a left outer join. So, I came up with something like this:



rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)


This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.



AN EXAMPLE



Just in case my idea is not clear yet, I have the following example:



Given two RDDs that have the following data:



rdd1:



key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c


rdd2:



key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z


The resulting rdd3 should be



key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y









share|improve this question

















  • 1





    Why not group by key before joining on these keys?

    – Terry Dactyl
    Nov 15 '18 at 8:37











  • @TerryDactyl I think grouping would slow down the program even further, am I right?

    – m2008m1033m
    Nov 15 '18 at 8:50











  • I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.

    – Terry Dactyl
    Nov 15 '18 at 9:04












  • @TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.

    – m2008m1033m
    Nov 15 '18 at 9:10











  • I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.

    – Terry Dactyl
    Nov 15 '18 at 9:11

















1















THE SCENARIO



I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.



What I'm trying to achieve is simple:



  • Given two RDDs: rdd1 and rdd2 (both have the same structure: (k, v))

  • Using rdd1 and rdd2, generate another RDD rdd3 that has the structure: (k1, v1, List(v2..))


  • k1 and v1 come from rdd1 (same values, this will lead to rdd1 and rdd3 have the same length)


  • List(v2..) is a list whose values are coming from the values of rdd2

  • To add an rdd2's v to the list in rdd3's tuple, its k (the key from rdd2) should match the k from rdd1

MY ATTEMPT



My approach was to use a left outer join. So, I came up with something like this:



rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)


This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.



AN EXAMPLE



Just in case my idea is not clear yet, I have the following example:



Given two RDDs that have the following data:



rdd1:



key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c


rdd2:



key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z


The resulting rdd3 should be



key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y









share|improve this question

















  • 1





    Why not group by key before joining on these keys?

    – Terry Dactyl
    Nov 15 '18 at 8:37











  • @TerryDactyl I think grouping would slow down the program even further, am I right?

    – m2008m1033m
    Nov 15 '18 at 8:50











  • I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.

    – Terry Dactyl
    Nov 15 '18 at 9:04












  • @TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.

    – m2008m1033m
    Nov 15 '18 at 9:10











  • I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.

    – Terry Dactyl
    Nov 15 '18 at 9:11













1












1








1








THE SCENARIO



I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.



What I'm trying to achieve is simple:



  • Given two RDDs: rdd1 and rdd2 (both have the same structure: (k, v))

  • Using rdd1 and rdd2, generate another RDD rdd3 that has the structure: (k1, v1, List(v2..))


  • k1 and v1 come from rdd1 (same values, this will lead to rdd1 and rdd3 have the same length)


  • List(v2..) is a list whose values are coming from the values of rdd2

  • To add an rdd2's v to the list in rdd3's tuple, its k (the key from rdd2) should match the k from rdd1

MY ATTEMPT



My approach was to use a left outer join. So, I came up with something like this:



rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)


This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.



AN EXAMPLE



Just in case my idea is not clear yet, I have the following example:



Given two RDDs that have the following data:



rdd1:



key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c


rdd2:



key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z


The resulting rdd3 should be



key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y









share|improve this question














THE SCENARIO



I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.



What I'm trying to achieve is simple:



  • Given two RDDs: rdd1 and rdd2 (both have the same structure: (k, v))

  • Using rdd1 and rdd2, generate another RDD rdd3 that has the structure: (k1, v1, List(v2..))


  • k1 and v1 come from rdd1 (same values, this will lead to rdd1 and rdd3 have the same length)


  • List(v2..) is a list whose values are coming from the values of rdd2

  • To add an rdd2's v to the list in rdd3's tuple, its k (the key from rdd2) should match the k from rdd1

MY ATTEMPT



My approach was to use a left outer join. So, I came up with something like this:



rdd1.leftOuterJoin(rdd2).mapcase(k, (v1, v2)) => ((k, v1), Array(v2))
.reduceByKey(_ ++ _)


This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.



AN EXAMPLE



Just in case my idea is not clear yet, I have the following example:



Given two RDDs that have the following data:



rdd1:



key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c


rdd2:



key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z


The resulting rdd3 should be



key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y






apache-spark join rdd






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 15 '18 at 7:55









m2008m1033mm2008m1033m

369319




369319







  • 1





    Why not group by key before joining on these keys?

    – Terry Dactyl
    Nov 15 '18 at 8:37











  • @TerryDactyl I think grouping would slow down the program even further, am I right?

    – m2008m1033m
    Nov 15 '18 at 8:50











  • I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.

    – Terry Dactyl
    Nov 15 '18 at 9:04












  • @TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.

    – m2008m1033m
    Nov 15 '18 at 9:10











  • I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.

    – Terry Dactyl
    Nov 15 '18 at 9:11












  • 1





    Why not group by key before joining on these keys?

    – Terry Dactyl
    Nov 15 '18 at 8:37











  • @TerryDactyl I think grouping would slow down the program even further, am I right?

    – m2008m1033m
    Nov 15 '18 at 8:50











  • I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.

    – Terry Dactyl
    Nov 15 '18 at 9:04












  • @TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.

    – m2008m1033m
    Nov 15 '18 at 9:10











  • I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.

    – Terry Dactyl
    Nov 15 '18 at 9:11







1




1





Why not group by key before joining on these keys?

– Terry Dactyl
Nov 15 '18 at 8:37





Why not group by key before joining on these keys?

– Terry Dactyl
Nov 15 '18 at 8:37













@TerryDactyl I think grouping would slow down the program even further, am I right?

– m2008m1033m
Nov 15 '18 at 8:50





@TerryDactyl I think grouping would slow down the program even further, am I right?

– m2008m1033m
Nov 15 '18 at 8:50













I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.

– Terry Dactyl
Nov 15 '18 at 9:04






I guess it depends on how you generate rdd1 and rdd2. Before trying anything else how are rdd1 and rdd2 partitioned? If you are unfamiliar with spark joins it is definitely worth googling spark sql join optimization. If you can ensure your data is partitioned so that the identical keys are co-located on the same executor then your join will be faster.

– Terry Dactyl
Nov 15 '18 at 9:04














@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.

– m2008m1033m
Nov 15 '18 at 9:10





@TerryDactyl That was actually what I wanted to do from the start! I want to have some control on how the tuples are partitioned so the ones with similar keys are at the same place. I did a some search on the Internet but did not find much about the topic.

– m2008m1033m
Nov 15 '18 at 9:10













I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.

– Terry Dactyl
Nov 15 '18 at 9:11





I believe colocation is guaranteed if the rdds are partitioned using the same partitioner in the same job.

– Terry Dactyl
Nov 15 '18 at 9:11












1 Answer
1






active

oldest

votes


















2














First of all don't use:



map ... => (..., Array(...)) .reduceByKey(_ ++ _)


That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey.



Additionally just to groupByKey afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup directly (that's how RDD joins work) and flatMap



val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))

val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))

val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)


You can also use DataSet API which tends to be more efficient in such cases



import org.apache.spark.sql.functions.collect_list

val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")


df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


The result:



+---+---------------+---+ 
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first



val should_keep = 
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))



df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


When using Dataset API please be sure to adjust spark.sql.shuffle.partitions to reflect the amount of data you process.



Note:



None of that will help you if number of duplicates in rdd2 is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.






share|improve this answer























  • Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.

    – m2008m1033m
    Nov 16 '18 at 10:31











  • @m2008m1033m I recommend reading this.

    – user10465355
    Nov 16 '18 at 12:15











  • I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?

    – m2008m1033m
    Dec 18 '18 at 4:45












Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53314711%2fspark-rdd-left-outer-join-optimization-for-duplicate-keys%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









2














First of all don't use:



map ... => (..., Array(...)) .reduceByKey(_ ++ _)


That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey.



Additionally just to groupByKey afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup directly (that's how RDD joins work) and flatMap



val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))

val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))

val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)


You can also use DataSet API which tends to be more efficient in such cases



import org.apache.spark.sql.functions.collect_list

val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")


df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


The result:



+---+---------------+---+ 
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first



val should_keep = 
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))



df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


When using Dataset API please be sure to adjust spark.sql.shuffle.partitions to reflect the amount of data you process.



Note:



None of that will help you if number of duplicates in rdd2 is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.






share|improve this answer























  • Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.

    – m2008m1033m
    Nov 16 '18 at 10:31











  • @m2008m1033m I recommend reading this.

    – user10465355
    Nov 16 '18 at 12:15











  • I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?

    – m2008m1033m
    Dec 18 '18 at 4:45
















2














First of all don't use:



map ... => (..., Array(...)) .reduceByKey(_ ++ _)


That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey.



Additionally just to groupByKey afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup directly (that's how RDD joins work) and flatMap



val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))

val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))

val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)


You can also use DataSet API which tends to be more efficient in such cases



import org.apache.spark.sql.functions.collect_list

val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")


df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


The result:



+---+---------------+---+ 
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first



val should_keep = 
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))



df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


When using Dataset API please be sure to adjust spark.sql.shuffle.partitions to reflect the amount of data you process.



Note:



None of that will help you if number of duplicates in rdd2 is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.






share|improve this answer























  • Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.

    – m2008m1033m
    Nov 16 '18 at 10:31











  • @m2008m1033m I recommend reading this.

    – user10465355
    Nov 16 '18 at 12:15











  • I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?

    – m2008m1033m
    Dec 18 '18 at 4:45














2












2








2







First of all don't use:



map ... => (..., Array(...)) .reduceByKey(_ ++ _)


That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey.



Additionally just to groupByKey afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup directly (that's how RDD joins work) and flatMap



val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))

val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))

val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)


You can also use DataSet API which tends to be more efficient in such cases



import org.apache.spark.sql.functions.collect_list

val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")


df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


The result:



+---+---------------+---+ 
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first



val should_keep = 
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))



df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


When using Dataset API please be sure to adjust spark.sql.shuffle.partitions to reflect the amount of data you process.



Note:



None of that will help you if number of duplicates in rdd2 is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.






share|improve this answer













First of all don't use:



map ... => (..., Array(...)) .reduceByKey(_ ++ _)


That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey.



Additionally just to groupByKey afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup directly (that's how RDD joins work) and flatMap



val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))

val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))

val rdd = rdd1
.cogroup(rdd2)
.flatMapValues case (left, right) => left.map((_, right))
.map case (k1, (k2, vs)) => ((k1, k2), vs)


You can also use DataSet API which tends to be more efficient in such cases



import org.apache.spark.sql.functions.collect_list

val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")


df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


The result:



+---+---------------+---+ 
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first



val should_keep = 
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))



df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show


+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+


When using Dataset API please be sure to adjust spark.sql.shuffle.partitions to reflect the amount of data you process.



Note:



None of that will help you if number of duplicates in rdd2 is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 15 '18 at 10:18









user10465355user10465355

2,1392521




2,1392521












  • Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.

    – m2008m1033m
    Nov 16 '18 at 10:31











  • @m2008m1033m I recommend reading this.

    – user10465355
    Nov 16 '18 at 12:15











  • I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?

    – m2008m1033m
    Dec 18 '18 at 4:45


















  • Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.

    – m2008m1033m
    Nov 16 '18 at 10:31











  • @m2008m1033m I recommend reading this.

    – user10465355
    Nov 16 '18 at 12:15











  • I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?

    – m2008m1033m
    Dec 18 '18 at 4:45

















Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.

– m2008m1033m
Nov 16 '18 at 10:31





Nice solution! regarding your first comment on using reduceByKey, what I know is that reducing first happens on the same executor and then among them using shuffling. This can greatly help with minimizing the traffic between executors. Whereas groupByKey starts by shuffling and then grouping the items, resulting in a larger number of items being exchanged.

– m2008m1033m
Nov 16 '18 at 10:31













@m2008m1033m I recommend reading this.

– user10465355
Nov 16 '18 at 12:15





@m2008m1033m I recommend reading this.

– user10465355
Nov 16 '18 at 12:15













I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?

– m2008m1033m
Dec 18 '18 at 4:45






I'm trying to apply the DataFrames API, but it looks like it does not perform as well as RDDs. I posted a question regarding this, could you please have a look?

– m2008m1033m
Dec 18 '18 at 4:45




















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53314711%2fspark-rdd-left-outer-join-optimization-for-duplicate-keys%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Use pre created SQLite database for Android project in kotlin

Darth Vader #20

Ondo