Spark Dataframe grouping and partition by key with a set number of partitions.
I have a spark dataframe with multiple labels and features coreesponding to each, like this:
+----------------+--------------------+
| label| feature_paths|
+----------------+--------------------+
| person1|[-0.015756417, 0....|
| person1|[-0.05177306, 0.1...|
| person1|[-0.11631858, 0.1...|
| person2|[-0.058303248, 0....|
| person2|[-0.03415013, 0.0...|
+----------------+--------------------+
I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service)
which will eventually save a gmm
model for each entity.
The code is like:
def service(rddentry):
label = rddentry[0]
features = rddentry[1]
print(label)
from sklearn.mixture import BayesianGaussianMixture
from sklearn.externals import joblib
gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
model = gmm.fit(features)
joblib.dump(model, str(label)+'.joblib')
return model
My goals, that I want to achieve is:
Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
Each rdd entry will have multiple features, belonging to a single label.Send each rdd partition to the service function.
My experiments until now:
When doing
sdf.repartition('label')
, it creates several empty dataframes.sdf.partionBy('label')
also does not work. It creates a random number of partitions.
I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.
pyspark apache-spark-sql
add a comment |
I have a spark dataframe with multiple labels and features coreesponding to each, like this:
+----------------+--------------------+
| label| feature_paths|
+----------------+--------------------+
| person1|[-0.015756417, 0....|
| person1|[-0.05177306, 0.1...|
| person1|[-0.11631858, 0.1...|
| person2|[-0.058303248, 0....|
| person2|[-0.03415013, 0.0...|
+----------------+--------------------+
I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service)
which will eventually save a gmm
model for each entity.
The code is like:
def service(rddentry):
label = rddentry[0]
features = rddentry[1]
print(label)
from sklearn.mixture import BayesianGaussianMixture
from sklearn.externals import joblib
gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
model = gmm.fit(features)
joblib.dump(model, str(label)+'.joblib')
return model
My goals, that I want to achieve is:
Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
Each rdd entry will have multiple features, belonging to a single label.Send each rdd partition to the service function.
My experiments until now:
When doing
sdf.repartition('label')
, it creates several empty dataframes.sdf.partionBy('label')
also does not work. It creates a random number of partitions.
I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.
pyspark apache-spark-sql
add a comment |
I have a spark dataframe with multiple labels and features coreesponding to each, like this:
+----------------+--------------------+
| label| feature_paths|
+----------------+--------------------+
| person1|[-0.015756417, 0....|
| person1|[-0.05177306, 0.1...|
| person1|[-0.11631858, 0.1...|
| person2|[-0.058303248, 0....|
| person2|[-0.03415013, 0.0...|
+----------------+--------------------+
I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service)
which will eventually save a gmm
model for each entity.
The code is like:
def service(rddentry):
label = rddentry[0]
features = rddentry[1]
print(label)
from sklearn.mixture import BayesianGaussianMixture
from sklearn.externals import joblib
gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
model = gmm.fit(features)
joblib.dump(model, str(label)+'.joblib')
return model
My goals, that I want to achieve is:
Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
Each rdd entry will have multiple features, belonging to a single label.Send each rdd partition to the service function.
My experiments until now:
When doing
sdf.repartition('label')
, it creates several empty dataframes.sdf.partionBy('label')
also does not work. It creates a random number of partitions.
I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.
pyspark apache-spark-sql
I have a spark dataframe with multiple labels and features coreesponding to each, like this:
+----------------+--------------------+
| label| feature_paths|
+----------------+--------------------+
| person1|[-0.015756417, 0....|
| person1|[-0.05177306, 0.1...|
| person1|[-0.11631858, 0.1...|
| person2|[-0.058303248, 0....|
| person2|[-0.03415013, 0.0...|
+----------------+--------------------+
I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service)
which will eventually save a gmm
model for each entity.
The code is like:
def service(rddentry):
label = rddentry[0]
features = rddentry[1]
print(label)
from sklearn.mixture import BayesianGaussianMixture
from sklearn.externals import joblib
gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
model = gmm.fit(features)
joblib.dump(model, str(label)+'.joblib')
return model
My goals, that I want to achieve is:
Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels.
Each rdd entry will have multiple features, belonging to a single label.Send each rdd partition to the service function.
My experiments until now:
When doing
sdf.repartition('label')
, it creates several empty dataframes.sdf.partionBy('label')
also does not work. It creates a random number of partitions.
I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.
pyspark apache-spark-sql
pyspark apache-spark-sql
asked Nov 15 '18 at 5:33
Pradip GuptaPradip Gupta
7211
7211
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You can use partitionBy
with new HashPartitioner(number_of_partitions)
One extra action required to count the unique labels count and you can use that as number of required partitions.
Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map
to get the necessary times from a tuple
scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))
Let me know if it helps.
add a comment |
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53313030%2fspark-dataframe-grouping-and-partition-by-key-with-a-set-number-of-partitions%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can use partitionBy
with new HashPartitioner(number_of_partitions)
One extra action required to count the unique labels count and you can use that as number of required partitions.
Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map
to get the necessary times from a tuple
scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))
Let me know if it helps.
add a comment |
You can use partitionBy
with new HashPartitioner(number_of_partitions)
One extra action required to count the unique labels count and you can use that as number of required partitions.
Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map
to get the necessary times from a tuple
scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))
Let me know if it helps.
add a comment |
You can use partitionBy
with new HashPartitioner(number_of_partitions)
One extra action required to count the unique labels count and you can use that as number of required partitions.
Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map
to get the necessary times from a tuple
scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))
Let me know if it helps.
You can use partitionBy
with new HashPartitioner(number_of_partitions)
One extra action required to count the unique labels count and you can use that as number of required partitions.
Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map
to get the necessary times from a tuple
scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))
Let me know if it helps.
edited Nov 15 '18 at 18:27
Ali AzG
7311717
7311717
answered Nov 15 '18 at 7:11
Sathiyan SSathiyan S
513310
513310
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53313030%2fspark-dataframe-grouping-and-partition-by-key-with-a-set-number-of-partitions%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown