How to parse json in a beam pipeline?
My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.
"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"
I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:
S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321
The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.
My code so far is as below:
public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));
p.run().waitUntilFinish();
I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
System.out.println(sender+","+timestamp+","+messageType);
java json apache-kafka apache-beam
add a comment |
My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.
"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"
I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:
S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321
The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.
My code so far is as below:
public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));
p.run().waitUntilFinish();
I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
System.out.println(sender+","+timestamp+","+messageType);
java json apache-kafka apache-beam
Two things here, you can usejsonObject.getString("timestamp");
instead ofjsonObject.get("timestamp");
.2) what do you mean byhow to parse the json to get the required csv format within the pipeline
so you want to save some attributes in JSON to csv file? can you show the example?
– Deadpool
Nov 15 '18 at 3:10
Kafka has a JSONDeserializer, by the way
– cricket_007
Nov 15 '18 at 3:15
add a comment |
My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.
"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"
I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:
S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321
The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.
My code so far is as below:
public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));
p.run().waitUntilFinish();
I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
System.out.println(sender+","+timestamp+","+messageType);
java json apache-kafka apache-beam
My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.
"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"
I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:
S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321
The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.
My code so far is as below:
public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));
p.run().waitUntilFinish();
I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
System.out.println(sender+","+timestamp+","+messageType);
java json apache-kafka apache-beam
java json apache-kafka apache-beam
asked Nov 15 '18 at 2:43
Utsav ChatterjeeUtsav Chatterjee
7410
7410
Two things here, you can usejsonObject.getString("timestamp");
instead ofjsonObject.get("timestamp");
.2) what do you mean byhow to parse the json to get the required csv format within the pipeline
so you want to save some attributes in JSON to csv file? can you show the example?
– Deadpool
Nov 15 '18 at 3:10
Kafka has a JSONDeserializer, by the way
– cricket_007
Nov 15 '18 at 3:15
add a comment |
Two things here, you can usejsonObject.getString("timestamp");
instead ofjsonObject.get("timestamp");
.2) what do you mean byhow to parse the json to get the required csv format within the pipeline
so you want to save some attributes in JSON to csv file? can you show the example?
– Deadpool
Nov 15 '18 at 3:10
Kafka has a JSONDeserializer, by the way
– cricket_007
Nov 15 '18 at 3:15
Two things here, you can use
jsonObject.getString("timestamp");
instead of jsonObject.get("timestamp");
.2) what do you mean by how to parse the json to get the required csv format within the pipeline
so you want to save some attributes in JSON to csv file? can you show the example?– Deadpool
Nov 15 '18 at 3:10
Two things here, you can use
jsonObject.getString("timestamp");
instead of jsonObject.get("timestamp");
.2) what do you mean by how to parse the json to get the required csv format within the pipeline
so you want to save some attributes in JSON to csv file? can you show the example?– Deadpool
Nov 15 '18 at 3:10
Kafka has a JSONDeserializer, by the way
– cricket_007
Nov 15 '18 at 3:15
Kafka has a JSONDeserializer, by the way
– cricket_007
Nov 15 '18 at 3:15
add a comment |
1 Answer
1
active
oldest
votes
According to the documentation, you will need to write a transformation (or find one that matches your use case).
https://beam.apache.org/documentation/programming-guide/#composite-transforms
The documentation also provides an excellent example.
Example that should produce your output:
.apply(Values.<String>create())
.apply(
"JSONtoData", // the transform name
ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out)
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
out.output(sender+","+timestamp+","+messageType);
));
To return CSV values, just change the generics to:
new DoFn<String, YourCSVClassHere>()
OutputReceiver<YourCSVClassHere> out
I didn't test this code, use at own risk.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311672%2fhow-to-parse-json-in-a-beam-pipeline%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
According to the documentation, you will need to write a transformation (or find one that matches your use case).
https://beam.apache.org/documentation/programming-guide/#composite-transforms
The documentation also provides an excellent example.
Example that should produce your output:
.apply(Values.<String>create())
.apply(
"JSONtoData", // the transform name
ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out)
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
out.output(sender+","+timestamp+","+messageType);
));
To return CSV values, just change the generics to:
new DoFn<String, YourCSVClassHere>()
OutputReceiver<YourCSVClassHere> out
I didn't test this code, use at own risk.
add a comment |
According to the documentation, you will need to write a transformation (or find one that matches your use case).
https://beam.apache.org/documentation/programming-guide/#composite-transforms
The documentation also provides an excellent example.
Example that should produce your output:
.apply(Values.<String>create())
.apply(
"JSONtoData", // the transform name
ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out)
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
out.output(sender+","+timestamp+","+messageType);
));
To return CSV values, just change the generics to:
new DoFn<String, YourCSVClassHere>()
OutputReceiver<YourCSVClassHere> out
I didn't test this code, use at own risk.
add a comment |
According to the documentation, you will need to write a transformation (or find one that matches your use case).
https://beam.apache.org/documentation/programming-guide/#composite-transforms
The documentation also provides an excellent example.
Example that should produce your output:
.apply(Values.<String>create())
.apply(
"JSONtoData", // the transform name
ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out)
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
out.output(sender+","+timestamp+","+messageType);
));
To return CSV values, just change the generics to:
new DoFn<String, YourCSVClassHere>()
OutputReceiver<YourCSVClassHere> out
I didn't test this code, use at own risk.
According to the documentation, you will need to write a transformation (or find one that matches your use case).
https://beam.apache.org/documentation/programming-guide/#composite-transforms
The documentation also provides an excellent example.
Example that should produce your output:
.apply(Values.<String>create())
.apply(
"JSONtoData", // the transform name
ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out)
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
out.output(sender+","+timestamp+","+messageType);
));
To return CSV values, just change the generics to:
new DoFn<String, YourCSVClassHere>()
OutputReceiver<YourCSVClassHere> out
I didn't test this code, use at own risk.
answered Nov 15 '18 at 3:13
FriwiFriwi
422212
422212
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311672%2fhow-to-parse-json-in-a-beam-pipeline%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Two things here, you can use
jsonObject.getString("timestamp");
instead ofjsonObject.get("timestamp");
.2) what do you mean byhow to parse the json to get the required csv format within the pipeline
so you want to save some attributes in JSON to csv file? can you show the example?– Deadpool
Nov 15 '18 at 3:10
Kafka has a JSONDeserializer, by the way
– cricket_007
Nov 15 '18 at 3:15