Pyspark take the latest updated value from the column









up vote
0
down vote

favorite












I have a dataframe as following:



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+


I want to run following conditions-

1. If value is greater than 0, I want previous rows value2

2. If value is equal to 0, I want the average of previous row and next row's value2

3. If value is less than 0, then NULL

So I wrote the following code-



df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))


What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+


I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.



df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))


After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?










share|improve this question























  • Where did you try adding id==1?
    – karma4917
    Nov 9 at 17:04










  • @karma4917 edited please take a look
    – Visualisation App
    Nov 9 at 19:16










  • Did you try putting that in some loop?
    – karma4917
    Nov 9 at 20:25











  • If I have huge dataset loop is an inefficient way right
    – Visualisation App
    Nov 10 at 6:07










  • can you please add expected output?
    – Ali Yesilli
    Nov 10 at 12:34














up vote
0
down vote

favorite












I have a dataframe as following:



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+


I want to run following conditions-

1. If value is greater than 0, I want previous rows value2

2. If value is equal to 0, I want the average of previous row and next row's value2

3. If value is less than 0, then NULL

So I wrote the following code-



df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))


What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+


I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.



df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))


After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?










share|improve this question























  • Where did you try adding id==1?
    – karma4917
    Nov 9 at 17:04










  • @karma4917 edited please take a look
    – Visualisation App
    Nov 9 at 19:16










  • Did you try putting that in some loop?
    – karma4917
    Nov 9 at 20:25











  • If I have huge dataset loop is an inefficient way right
    – Visualisation App
    Nov 10 at 6:07










  • can you please add expected output?
    – Ali Yesilli
    Nov 10 at 12:34












up vote
0
down vote

favorite









up vote
0
down vote

favorite











I have a dataframe as following:



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+


I want to run following conditions-

1. If value is greater than 0, I want previous rows value2

2. If value is equal to 0, I want the average of previous row and next row's value2

3. If value is less than 0, then NULL

So I wrote the following code-



df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))


What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+


I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.



df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))


After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?










share|improve this question















I have a dataframe as following:



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+


I want to run following conditions-

1. If value is greater than 0, I want previous rows value2

2. If value is equal to 0, I want the average of previous row and next row's value2

3. If value is less than 0, then NULL

So I wrote the following code-



df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))


What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.



+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+


I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.



df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))


After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?







pyspark






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 9 at 19:16

























asked Nov 9 at 16:43









Visualisation App

6819




6819











  • Where did you try adding id==1?
    – karma4917
    Nov 9 at 17:04










  • @karma4917 edited please take a look
    – Visualisation App
    Nov 9 at 19:16










  • Did you try putting that in some loop?
    – karma4917
    Nov 9 at 20:25











  • If I have huge dataset loop is an inefficient way right
    – Visualisation App
    Nov 10 at 6:07










  • can you please add expected output?
    – Ali Yesilli
    Nov 10 at 12:34
















  • Where did you try adding id==1?
    – karma4917
    Nov 9 at 17:04










  • @karma4917 edited please take a look
    – Visualisation App
    Nov 9 at 19:16










  • Did you try putting that in some loop?
    – karma4917
    Nov 9 at 20:25











  • If I have huge dataset loop is an inefficient way right
    – Visualisation App
    Nov 10 at 6:07










  • can you please add expected output?
    – Ali Yesilli
    Nov 10 at 12:34















Where did you try adding id==1?
– karma4917
Nov 9 at 17:04




Where did you try adding id==1?
– karma4917
Nov 9 at 17:04












@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16




@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16












Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25





Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25













If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07




If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07












can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34




can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34












1 Answer
1






active

oldest

votes

















up vote
0
down vote













from pyspark.sql import SparkSession 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession
.builder
.appName('test')
.getOrCreate()


tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])

table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)


final_table=cmb_value.withColumn('value',new_column)


Above "final_table" will have field you are expecting.






share|improve this answer




















  • Window function need a partitionBy column to identified the previous and next value.
    – skay
    Nov 9 at 22:11










  • yeah this is just an example of what I require. In my code I partition it by timestamps
    – Visualisation App
    Nov 10 at 5:57










  • This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
    – Visualisation App
    Nov 10 at 6:06










  • Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
    – skay
    Nov 12 at 20:25











  • the second df in my question is the expected output
    – Visualisation App
    2 days ago










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53229928%2fpyspark-take-the-latest-updated-value-from-the-column%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
0
down vote













from pyspark.sql import SparkSession 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession
.builder
.appName('test')
.getOrCreate()


tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])

table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)


final_table=cmb_value.withColumn('value',new_column)


Above "final_table" will have field you are expecting.






share|improve this answer




















  • Window function need a partitionBy column to identified the previous and next value.
    – skay
    Nov 9 at 22:11










  • yeah this is just an example of what I require. In my code I partition it by timestamps
    – Visualisation App
    Nov 10 at 5:57










  • This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
    – Visualisation App
    Nov 10 at 6:06










  • Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
    – skay
    Nov 12 at 20:25











  • the second df in my question is the expected output
    – Visualisation App
    2 days ago














up vote
0
down vote













from pyspark.sql import SparkSession 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession
.builder
.appName('test')
.getOrCreate()


tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])

table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)


final_table=cmb_value.withColumn('value',new_column)


Above "final_table" will have field you are expecting.






share|improve this answer




















  • Window function need a partitionBy column to identified the previous and next value.
    – skay
    Nov 9 at 22:11










  • yeah this is just an example of what I require. In my code I partition it by timestamps
    – Visualisation App
    Nov 10 at 5:57










  • This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
    – Visualisation App
    Nov 10 at 6:06










  • Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
    – skay
    Nov 12 at 20:25











  • the second df in my question is the expected output
    – Visualisation App
    2 days ago












up vote
0
down vote










up vote
0
down vote









from pyspark.sql import SparkSession 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession
.builder
.appName('test')
.getOrCreate()


tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])

table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)


final_table=cmb_value.withColumn('value',new_column)


Above "final_table" will have field you are expecting.






share|improve this answer












from pyspark.sql import SparkSession 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession
.builder
.appName('test')
.getOrCreate()


tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])

table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)


final_table=cmb_value.withColumn('value',new_column)


Above "final_table" will have field you are expecting.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 9 at 22:11









skay

1




1











  • Window function need a partitionBy column to identified the previous and next value.
    – skay
    Nov 9 at 22:11










  • yeah this is just an example of what I require. In my code I partition it by timestamps
    – Visualisation App
    Nov 10 at 5:57










  • This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
    – Visualisation App
    Nov 10 at 6:06










  • Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
    – skay
    Nov 12 at 20:25











  • the second df in my question is the expected output
    – Visualisation App
    2 days ago
















  • Window function need a partitionBy column to identified the previous and next value.
    – skay
    Nov 9 at 22:11










  • yeah this is just an example of what I require. In my code I partition it by timestamps
    – Visualisation App
    Nov 10 at 5:57










  • This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
    – Visualisation App
    Nov 10 at 6:06










  • Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
    – skay
    Nov 12 at 20:25











  • the second df in my question is the expected output
    – Visualisation App
    2 days ago















Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11




Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11












yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57




yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57












This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06




This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06












Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25





Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25













the second df in my question is the expected output
– Visualisation App
2 days ago




the second df in my question is the expected output
– Visualisation App
2 days ago

















 

draft saved


draft discarded















































 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53229928%2fpyspark-take-the-latest-updated-value-from-the-column%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Use pre created SQLite database for Android project in kotlin

Darth Vader #20

Ondo