Pyspark take the latest updated value from the column
up vote
0
down vote
favorite
I have a dataframe as following:
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+
I want to run following conditions-
1. If value is greater than 0, I want previous rows value2
2. If value is equal to 0, I want the average of previous row and next row's value2
3. If value is less than 0, then NULL
So I wrote the following code-
df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))
What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+
I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.
df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))
After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?
pyspark
|
show 1 more comment
up vote
0
down vote
favorite
I have a dataframe as following:
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+
I want to run following conditions-
1. If value is greater than 0, I want previous rows value2
2. If value is equal to 0, I want the average of previous row and next row's value2
3. If value is less than 0, then NULL
So I wrote the following code-
df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))
What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+
I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.
df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))
After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?
pyspark
Where did you try addingid==1
?
– karma4917
Nov 9 at 17:04
@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16
Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25
If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07
can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34
|
show 1 more comment
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have a dataframe as following:
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+
I want to run following conditions-
1. If value is greater than 0, I want previous rows value2
2. If value is equal to 0, I want the average of previous row and next row's value2
3. If value is less than 0, then NULL
So I wrote the following code-
df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))
What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+
I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.
df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))
After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?
pyspark
I have a dataframe as following:
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | 30 | 0 |
| 2 | 0 | 9 | 0 |
| 3 | 23627 | 17 | 1 |
| 4 | 8373 | 23 | 0 |
| 5 | -0.5 | 4 | 1 |
+----+--------+--------+------+
I want to run following conditions-
1. If value is greater than 0, I want previous rows value2
2. If value is equal to 0, I want the average of previous row and next row's value2
3. If value is less than 0, then NULL
So I wrote the following code-
df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,
(lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))
What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.
+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
| 1 | 7000 | null | 0 |
| 2 | 0 | 8.5 | 0 |
| 3 | 23627 | 8.5 | 1 |
| 4 | 8373 | 8.5 | 0 |
| 5 | -0.5 | null | 1 |
+----+--------+--------+------+
I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.
df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))
After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?
pyspark
pyspark
edited Nov 9 at 19:16
asked Nov 9 at 16:43
Visualisation App
6819
6819
Where did you try addingid==1
?
– karma4917
Nov 9 at 17:04
@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16
Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25
If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07
can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34
|
show 1 more comment
Where did you try addingid==1
?
– karma4917
Nov 9 at 17:04
@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16
Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25
If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07
can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34
Where did you try adding
id==1
?– karma4917
Nov 9 at 17:04
Where did you try adding
id==1
?– karma4917
Nov 9 at 17:04
@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16
@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16
Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25
Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25
If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07
If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07
can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34
can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34
|
show 1 more comment
1 Answer
1
active
oldest
votes
up vote
0
down vote
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession
.builder
.appName('test')
.getOrCreate()
tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])
table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)
final_table=cmb_value.withColumn('value',new_column)
Above "final_table" will have field you are expecting.
Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11
yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57
This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06
Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25
the second df in my question is the expected output
– Visualisation App
2 days ago
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession
.builder
.appName('test')
.getOrCreate()
tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])
table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)
final_table=cmb_value.withColumn('value',new_column)
Above "final_table" will have field you are expecting.
Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11
yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57
This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06
Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25
the second df in my question is the expected output
– Visualisation App
2 days ago
add a comment |
up vote
0
down vote
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession
.builder
.appName('test')
.getOrCreate()
tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])
table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)
final_table=cmb_value.withColumn('value',new_column)
Above "final_table" will have field you are expecting.
Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11
yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57
This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06
Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25
the second df in my question is the expected output
– Visualisation App
2 days ago
add a comment |
up vote
0
down vote
up vote
0
down vote
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession
.builder
.appName('test')
.getOrCreate()
tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])
table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)
final_table=cmb_value.withColumn('value',new_column)
Above "final_table" will have field you are expecting.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession
.builder
.appName('test')
.getOrCreate()
tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
StructField('value1',FloatType(),True),
StructField('value2',IntegerType(),True),
StructField('flag',IntegerType(),True)
])
table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))
new_column=when(col('value1')>0,cmb_value.pre_value)
.when(col('value1')<0,cmb_value.next_value)
.otherwise((cmb_value.pre_value+cmb_value.next_value)/2)
final_table=cmb_value.withColumn('value',new_column)
Above "final_table" will have field you are expecting.
answered Nov 9 at 22:11
skay
1
1
Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11
yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57
This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06
Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25
the second df in my question is the expected output
– Visualisation App
2 days ago
add a comment |
Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11
yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57
This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06
Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25
the second df in my question is the expected output
– Visualisation App
2 days ago
Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11
Window function need a partitionBy column to identified the previous and next value.
– skay
Nov 9 at 22:11
yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57
yeah this is just an example of what I require. In my code I partition it by timestamps
– Visualisation App
Nov 10 at 5:57
This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06
This will not give me the desired result. It will give null for id=1 since there is no lag and for id=2 it will give (30+9)/2, while I want the updated value (null+9)/2
– Visualisation App
Nov 10 at 6:06
Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25
Can you plz give more scenario, for id =1 , whats ur expectation. ? --> . My assumption , As the value1 > 0 so we have to populate (id -1)i.e lag , as this is the first record, new column will be populated with null. If you want working code , please provide more info and sample data.
– skay
Nov 12 at 20:25
the second df in my question is the expected output
– Visualisation App
2 days ago
the second df in my question is the expected output
– Visualisation App
2 days ago
add a comment |
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53229928%2fpyspark-take-the-latest-updated-value-from-the-column%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Where did you try adding
id==1
?– karma4917
Nov 9 at 17:04
@karma4917 edited please take a look
– Visualisation App
Nov 9 at 19:16
Did you try putting that in some loop?
– karma4917
Nov 9 at 20:25
If I have huge dataset loop is an inefficient way right
– Visualisation App
Nov 10 at 6:07
can you please add expected output?
– Ali Yesilli
Nov 10 at 12:34