Percentile based CountDistinct in Spark dataframe - Scala



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








0















My problem is this.
I have a custom dataframe created from netflow records that contains columns minuteMs (this is the same value for all flows in the given minute), sourcePort and some other values such as sourceIP, packetDelta etc.
There is a row for every flow and there can be many flows for one minute.
What I want to do is compute the number of distinct ports for all flows in the one minute interval, however I just want to count the number of distinct ports that fall within the 90th percentile with regards to how common they are.
This means I just want the number of 90% most distinct ports and the 10% of the least common do not interest me.



What I imagine I need to do is:




  1. Create a separate daframe for each minute.



    In these resulting dataframes:



  2. Group by port number.

  3. Count the number of elements in these groups.

  4. Sort these groups by the number of elements.


  5. Take top 90% of these groups and count how many there are. This will give one value X that is 90th percentile of distinct ports in one minute.



    Back "outside":



  6. Create a one column dataframe that will have as many columns as there are minutes filled with the X values.

  7. Join this column with the other dataframe I created by grouping and ordering by Minute and aggregating based on my logic used for computing other values that are not relevant for this question.


  8. The final results is a dataframe with a single row for every minute with one of its columns being the desired number of top 90% most common distinct source ports.


The issue is that I am very new to Scala and I have problems even putting together a code for the outlined steps, but more importantly I feel like this is extremely inefficient and can be done much more elegantly, I just dont know how. Any help or pointers is appreciated.



Mock up of an input dataframe:



flowID minuteMs srcPortNumber packetDelta
0 146000 80 5
1 146000 90 8
2 146000 90 4
3 146000 90 4
4 146000 90 17
5 146000 90 22
6 146000 80 2
7 146000 80 3
8 146000 80 4
9 146000 343 3
10 152000 133 5
11 152000 80 31
12 152000 8081 4


Expected output:



minuteMs 90thPercCountDistPorts packetCount
146000 2 72
152000 3 40


The packetCount column is just summed packets from all the flows in the given minute, I dont need help with this it is just there to illustrate that there will be other columns present as well.



Note that port 343 in time 146000 is not counted since it appears only in one flow out of six for that time window, therefore falls into the the least common 10% and as such should not be counted.










share|improve this question
























  • Can you post a sample of both input and expected output?

    – sramalingam24
    Nov 15 '18 at 17:09











  • Added, should have thought of that right away.

    – Matěj Nemec
    Nov 15 '18 at 18:04











  • Well i guess no one can think of a more elegant solution...I will start implementing mine then and post the code here, if I succeed.

    – Matěj Nemec
    Nov 17 '18 at 13:34











  • I would try grouping by minute and srcport to get the counts then a window function partitioned by minute, ordered by count and get cumulative sums and a select with filter.

    – sramalingam24
    Nov 17 '18 at 16:23

















0















My problem is this.
I have a custom dataframe created from netflow records that contains columns minuteMs (this is the same value for all flows in the given minute), sourcePort and some other values such as sourceIP, packetDelta etc.
There is a row for every flow and there can be many flows for one minute.
What I want to do is compute the number of distinct ports for all flows in the one minute interval, however I just want to count the number of distinct ports that fall within the 90th percentile with regards to how common they are.
This means I just want the number of 90% most distinct ports and the 10% of the least common do not interest me.



What I imagine I need to do is:




  1. Create a separate daframe for each minute.



    In these resulting dataframes:



  2. Group by port number.

  3. Count the number of elements in these groups.

  4. Sort these groups by the number of elements.


  5. Take top 90% of these groups and count how many there are. This will give one value X that is 90th percentile of distinct ports in one minute.



    Back "outside":



  6. Create a one column dataframe that will have as many columns as there are minutes filled with the X values.

  7. Join this column with the other dataframe I created by grouping and ordering by Minute and aggregating based on my logic used for computing other values that are not relevant for this question.


  8. The final results is a dataframe with a single row for every minute with one of its columns being the desired number of top 90% most common distinct source ports.


The issue is that I am very new to Scala and I have problems even putting together a code for the outlined steps, but more importantly I feel like this is extremely inefficient and can be done much more elegantly, I just dont know how. Any help or pointers is appreciated.



Mock up of an input dataframe:



flowID minuteMs srcPortNumber packetDelta
0 146000 80 5
1 146000 90 8
2 146000 90 4
3 146000 90 4
4 146000 90 17
5 146000 90 22
6 146000 80 2
7 146000 80 3
8 146000 80 4
9 146000 343 3
10 152000 133 5
11 152000 80 31
12 152000 8081 4


Expected output:



minuteMs 90thPercCountDistPorts packetCount
146000 2 72
152000 3 40


The packetCount column is just summed packets from all the flows in the given minute, I dont need help with this it is just there to illustrate that there will be other columns present as well.



Note that port 343 in time 146000 is not counted since it appears only in one flow out of six for that time window, therefore falls into the the least common 10% and as such should not be counted.










share|improve this question
























  • Can you post a sample of both input and expected output?

    – sramalingam24
    Nov 15 '18 at 17:09











  • Added, should have thought of that right away.

    – Matěj Nemec
    Nov 15 '18 at 18:04











  • Well i guess no one can think of a more elegant solution...I will start implementing mine then and post the code here, if I succeed.

    – Matěj Nemec
    Nov 17 '18 at 13:34











  • I would try grouping by minute and srcport to get the counts then a window function partitioned by minute, ordered by count and get cumulative sums and a select with filter.

    – sramalingam24
    Nov 17 '18 at 16:23













0












0








0








My problem is this.
I have a custom dataframe created from netflow records that contains columns minuteMs (this is the same value for all flows in the given minute), sourcePort and some other values such as sourceIP, packetDelta etc.
There is a row for every flow and there can be many flows for one minute.
What I want to do is compute the number of distinct ports for all flows in the one minute interval, however I just want to count the number of distinct ports that fall within the 90th percentile with regards to how common they are.
This means I just want the number of 90% most distinct ports and the 10% of the least common do not interest me.



What I imagine I need to do is:




  1. Create a separate daframe for each minute.



    In these resulting dataframes:



  2. Group by port number.

  3. Count the number of elements in these groups.

  4. Sort these groups by the number of elements.


  5. Take top 90% of these groups and count how many there are. This will give one value X that is 90th percentile of distinct ports in one minute.



    Back "outside":



  6. Create a one column dataframe that will have as many columns as there are minutes filled with the X values.

  7. Join this column with the other dataframe I created by grouping and ordering by Minute and aggregating based on my logic used for computing other values that are not relevant for this question.


  8. The final results is a dataframe with a single row for every minute with one of its columns being the desired number of top 90% most common distinct source ports.


The issue is that I am very new to Scala and I have problems even putting together a code for the outlined steps, but more importantly I feel like this is extremely inefficient and can be done much more elegantly, I just dont know how. Any help or pointers is appreciated.



Mock up of an input dataframe:



flowID minuteMs srcPortNumber packetDelta
0 146000 80 5
1 146000 90 8
2 146000 90 4
3 146000 90 4
4 146000 90 17
5 146000 90 22
6 146000 80 2
7 146000 80 3
8 146000 80 4
9 146000 343 3
10 152000 133 5
11 152000 80 31
12 152000 8081 4


Expected output:



minuteMs 90thPercCountDistPorts packetCount
146000 2 72
152000 3 40


The packetCount column is just summed packets from all the flows in the given minute, I dont need help with this it is just there to illustrate that there will be other columns present as well.



Note that port 343 in time 146000 is not counted since it appears only in one flow out of six for that time window, therefore falls into the the least common 10% and as such should not be counted.










share|improve this question
















My problem is this.
I have a custom dataframe created from netflow records that contains columns minuteMs (this is the same value for all flows in the given minute), sourcePort and some other values such as sourceIP, packetDelta etc.
There is a row for every flow and there can be many flows for one minute.
What I want to do is compute the number of distinct ports for all flows in the one minute interval, however I just want to count the number of distinct ports that fall within the 90th percentile with regards to how common they are.
This means I just want the number of 90% most distinct ports and the 10% of the least common do not interest me.



What I imagine I need to do is:




  1. Create a separate daframe for each minute.



    In these resulting dataframes:



  2. Group by port number.

  3. Count the number of elements in these groups.

  4. Sort these groups by the number of elements.


  5. Take top 90% of these groups and count how many there are. This will give one value X that is 90th percentile of distinct ports in one minute.



    Back "outside":



  6. Create a one column dataframe that will have as many columns as there are minutes filled with the X values.

  7. Join this column with the other dataframe I created by grouping and ordering by Minute and aggregating based on my logic used for computing other values that are not relevant for this question.


  8. The final results is a dataframe with a single row for every minute with one of its columns being the desired number of top 90% most common distinct source ports.


The issue is that I am very new to Scala and I have problems even putting together a code for the outlined steps, but more importantly I feel like this is extremely inefficient and can be done much more elegantly, I just dont know how. Any help or pointers is appreciated.



Mock up of an input dataframe:



flowID minuteMs srcPortNumber packetDelta
0 146000 80 5
1 146000 90 8
2 146000 90 4
3 146000 90 4
4 146000 90 17
5 146000 90 22
6 146000 80 2
7 146000 80 3
8 146000 80 4
9 146000 343 3
10 152000 133 5
11 152000 80 31
12 152000 8081 4


Expected output:



minuteMs 90thPercCountDistPorts packetCount
146000 2 72
152000 3 40


The packetCount column is just summed packets from all the flows in the given minute, I dont need help with this it is just there to illustrate that there will be other columns present as well.



Note that port 343 in time 146000 is not counted since it appears only in one flow out of six for that time window, therefore falls into the the least common 10% and as such should not be counted.







scala apache-spark dataframe apache-spark-sql percentile






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 15 '18 at 21:57







Matěj Nemec

















asked Nov 15 '18 at 16:01









Matěj NemecMatěj Nemec

115




115












  • Can you post a sample of both input and expected output?

    – sramalingam24
    Nov 15 '18 at 17:09











  • Added, should have thought of that right away.

    – Matěj Nemec
    Nov 15 '18 at 18:04











  • Well i guess no one can think of a more elegant solution...I will start implementing mine then and post the code here, if I succeed.

    – Matěj Nemec
    Nov 17 '18 at 13:34











  • I would try grouping by minute and srcport to get the counts then a window function partitioned by minute, ordered by count and get cumulative sums and a select with filter.

    – sramalingam24
    Nov 17 '18 at 16:23

















  • Can you post a sample of both input and expected output?

    – sramalingam24
    Nov 15 '18 at 17:09











  • Added, should have thought of that right away.

    – Matěj Nemec
    Nov 15 '18 at 18:04











  • Well i guess no one can think of a more elegant solution...I will start implementing mine then and post the code here, if I succeed.

    – Matěj Nemec
    Nov 17 '18 at 13:34











  • I would try grouping by minute and srcport to get the counts then a window function partitioned by minute, ordered by count and get cumulative sums and a select with filter.

    – sramalingam24
    Nov 17 '18 at 16:23
















Can you post a sample of both input and expected output?

– sramalingam24
Nov 15 '18 at 17:09





Can you post a sample of both input and expected output?

– sramalingam24
Nov 15 '18 at 17:09













Added, should have thought of that right away.

– Matěj Nemec
Nov 15 '18 at 18:04





Added, should have thought of that right away.

– Matěj Nemec
Nov 15 '18 at 18:04













Well i guess no one can think of a more elegant solution...I will start implementing mine then and post the code here, if I succeed.

– Matěj Nemec
Nov 17 '18 at 13:34





Well i guess no one can think of a more elegant solution...I will start implementing mine then and post the code here, if I succeed.

– Matěj Nemec
Nov 17 '18 at 13:34













I would try grouping by minute and srcport to get the counts then a window function partitioned by minute, ordered by count and get cumulative sums and a select with filter.

– sramalingam24
Nov 17 '18 at 16:23





I would try grouping by minute and srcport to get the counts then a window function partitioned by minute, ordered by count and get cumulative sums and a select with filter.

– sramalingam24
Nov 17 '18 at 16:23












0






active

oldest

votes












Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53323351%2fpercentile-based-countdistinct-in-spark-dataframe-scala%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53323351%2fpercentile-based-countdistinct-in-spark-dataframe-scala%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Use pre created SQLite database for Android project in kotlin

Darth Vader #20

Ondo