Spark Dataframe grouping and partition by key with a set number of partitions.










0















I have a spark dataframe with multiple labels and features coreesponding to each, like this:



+----------------+--------------------+
| label| feature_paths|
+----------------+--------------------+
| person1|[-0.015756417, 0....|
| person1|[-0.05177306, 0.1...|
| person1|[-0.11631858, 0.1...|
| person2|[-0.058303248, 0....|
| person2|[-0.03415013, 0.0...|
+----------------+--------------------+


I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service) which will eventually save a gmm model for each entity.



The code is like:



def service(rddentry):

label = rddentry[0]
features = rddentry[1]

print(label)

from sklearn.mixture import BayesianGaussianMixture
from sklearn.externals import joblib

gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
model = gmm.fit(features)
joblib.dump(model, str(label)+'.joblib')

return model


My goals, that I want to achieve is:



  1. Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
    Each rdd entry will have multiple features, belonging to a single label.


  2. Send each rdd partition to the service function.


My experiments until now:



  1. When doing sdf.repartition('label'), it creates several empty dataframes.


  2. sdf.partionBy('label') also does not work. It creates a random number of partitions.


I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.










share|improve this question


























    0















    I have a spark dataframe with multiple labels and features coreesponding to each, like this:



    +----------------+--------------------+
    | label| feature_paths|
    +----------------+--------------------+
    | person1|[-0.015756417, 0....|
    | person1|[-0.05177306, 0.1...|
    | person1|[-0.11631858, 0.1...|
    | person2|[-0.058303248, 0....|
    | person2|[-0.03415013, 0.0...|
    +----------------+--------------------+


    I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service) which will eventually save a gmm model for each entity.



    The code is like:



    def service(rddentry):

    label = rddentry[0]
    features = rddentry[1]

    print(label)

    from sklearn.mixture import BayesianGaussianMixture
    from sklearn.externals import joblib

    gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
    model = gmm.fit(features)
    joblib.dump(model, str(label)+'.joblib')

    return model


    My goals, that I want to achieve is:



    1. Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
      Each rdd entry will have multiple features, belonging to a single label.


    2. Send each rdd partition to the service function.


    My experiments until now:



    1. When doing sdf.repartition('label'), it creates several empty dataframes.


    2. sdf.partionBy('label') also does not work. It creates a random number of partitions.


    I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.










    share|improve this question
























      0












      0








      0








      I have a spark dataframe with multiple labels and features coreesponding to each, like this:



      +----------------+--------------------+
      | label| feature_paths|
      +----------------+--------------------+
      | person1|[-0.015756417, 0....|
      | person1|[-0.05177306, 0.1...|
      | person1|[-0.11631858, 0.1...|
      | person2|[-0.058303248, 0....|
      | person2|[-0.03415013, 0.0...|
      +----------------+--------------------+


      I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service) which will eventually save a gmm model for each entity.



      The code is like:



      def service(rddentry):

      label = rddentry[0]
      features = rddentry[1]

      print(label)

      from sklearn.mixture import BayesianGaussianMixture
      from sklearn.externals import joblib

      gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
      model = gmm.fit(features)
      joblib.dump(model, str(label)+'.joblib')

      return model


      My goals, that I want to achieve is:



      1. Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
        Each rdd entry will have multiple features, belonging to a single label.


      2. Send each rdd partition to the service function.


      My experiments until now:



      1. When doing sdf.repartition('label'), it creates several empty dataframes.


      2. sdf.partionBy('label') also does not work. It creates a random number of partitions.


      I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.










      share|improve this question














      I have a spark dataframe with multiple labels and features coreesponding to each, like this:



      +----------------+--------------------+
      | label| feature_paths|
      +----------------+--------------------+
      | person1|[-0.015756417, 0....|
      | person1|[-0.05177306, 0.1...|
      | person1|[-0.11631858, 0.1...|
      | person2|[-0.058303248, 0....|
      | person2|[-0.03415013, 0.0...|
      +----------------+--------------------+


      I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service) which will eventually save a gmm model for each entity.



      The code is like:



      def service(rddentry):

      label = rddentry[0]
      features = rddentry[1]

      print(label)

      from sklearn.mixture import BayesianGaussianMixture
      from sklearn.externals import joblib

      gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
      model = gmm.fit(features)
      joblib.dump(model, str(label)+'.joblib')

      return model


      My goals, that I want to achieve is:



      1. Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
        Each rdd entry will have multiple features, belonging to a single label.


      2. Send each rdd partition to the service function.


      My experiments until now:



      1. When doing sdf.repartition('label'), it creates several empty dataframes.


      2. sdf.partionBy('label') also does not work. It creates a random number of partitions.


      I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.







      pyspark apache-spark-sql






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 15 '18 at 5:33









      Pradip GuptaPradip Gupta

      7211




      7211






















          1 Answer
          1






          active

          oldest

          votes


















          0














          You can use partitionBy with new HashPartitioner(number_of_partitions)



          One extra action required to count the unique labels count and you can use that as number of required partitions.



          Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map to get the necessary times from a tuple



          scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
          scala> data.glom.collect
          res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
          scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
          scala> data_repart.glom.collect
          res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
          scala> data_repart.map(_._2).glom.collect
          res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))


          Let me know if it helps.






          share|improve this answer

























            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53313030%2fspark-dataframe-grouping-and-partition-by-key-with-a-set-number-of-partitions%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            You can use partitionBy with new HashPartitioner(number_of_partitions)



            One extra action required to count the unique labels count and you can use that as number of required partitions.



            Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map to get the necessary times from a tuple



            scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
            scala> data.glom.collect
            res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
            scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
            scala> data_repart.glom.collect
            res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
            scala> data_repart.map(_._2).glom.collect
            res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))


            Let me know if it helps.






            share|improve this answer





























              0














              You can use partitionBy with new HashPartitioner(number_of_partitions)



              One extra action required to count the unique labels count and you can use that as number of required partitions.



              Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map to get the necessary times from a tuple



              scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
              scala> data.glom.collect
              res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
              scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
              scala> data_repart.glom.collect
              res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
              scala> data_repart.map(_._2).glom.collect
              res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))


              Let me know if it helps.






              share|improve this answer



























                0












                0








                0







                You can use partitionBy with new HashPartitioner(number_of_partitions)



                One extra action required to count the unique labels count and you can use that as number of required partitions.



                Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map to get the necessary times from a tuple



                scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
                scala> data.glom.collect
                res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
                scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
                scala> data_repart.glom.collect
                res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
                scala> data_repart.map(_._2).glom.collect
                res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))


                Let me know if it helps.






                share|improve this answer















                You can use partitionBy with new HashPartitioner(number_of_partitions)



                One extra action required to count the unique labels count and you can use that as number of required partitions.



                Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map to get the necessary times from a tuple



                scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
                scala> data.glom.collect
                res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
                scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
                scala> data_repart.glom.collect
                res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
                scala> data_repart.map(_._2).glom.collect
                res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))


                Let me know if it helps.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Nov 15 '18 at 18:27









                Ali AzG

                7311717




                7311717










                answered Nov 15 '18 at 7:11









                Sathiyan SSathiyan S

                513310




                513310





























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53313030%2fspark-dataframe-grouping-and-partition-by-key-with-a-set-number-of-partitions%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Use pre created SQLite database for Android project in kotlin

                    Darth Vader #20

                    Ondo