How to parse json in a beam pipeline?










2















My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.



"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"


I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:



S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321


The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.



My code so far is as below:



public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)

.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));

p.run().waitUntilFinish();





I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?



JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();

JSONObject jsonObject = (JSONObject) obj;

String sender = (String) jsonObject.get("sender");

String messageType = (String) jsonObject.get("messageType");

String timestamp = (String) jsonObject.get("timestamp");

System.out.println(sender+","+timestamp+","+messageType);









share|improve this question






















  • Two things here, you can use jsonObject.getString("timestamp"); instead of jsonObject.get("timestamp");.2) what do you mean by how to parse the json to get the required csv format within the pipeline so you want to save some attributes in JSON to csv file? can you show the example?

    – Deadpool
    Nov 15 '18 at 3:10











  • Kafka has a JSONDeserializer, by the way

    – cricket_007
    Nov 15 '18 at 3:15















2















My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.



"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"


I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:



S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321


The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.



My code so far is as below:



public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)

.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));

p.run().waitUntilFinish();





I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?



JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();

JSONObject jsonObject = (JSONObject) obj;

String sender = (String) jsonObject.get("sender");

String messageType = (String) jsonObject.get("messageType");

String timestamp = (String) jsonObject.get("timestamp");

System.out.println(sender+","+timestamp+","+messageType);









share|improve this question






















  • Two things here, you can use jsonObject.getString("timestamp"); instead of jsonObject.get("timestamp");.2) what do you mean by how to parse the json to get the required csv format within the pipeline so you want to save some attributes in JSON to csv file? can you show the example?

    – Deadpool
    Nov 15 '18 at 3:10











  • Kafka has a JSONDeserializer, by the way

    – cricket_007
    Nov 15 '18 at 3:15













2












2








2








My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.



"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"


I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:



S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321


The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.



My code so far is as below:



public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)

.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));

p.run().waitUntilFinish();





I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?



JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();

JSONObject jsonObject = (JSONObject) obj;

String sender = (String) jsonObject.get("sender");

String messageType = (String) jsonObject.get("messageType");

String timestamp = (String) jsonObject.get("timestamp");

System.out.println(sender+","+timestamp+","+messageType);









share|improve this question














My data is of newline delimited json form and looks like shown below. I am reading this type of data from a Kafka topic.



"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":"name":"John Doe", "id":"12DROIY321"


I want to build an apache Beam pipeline which reads this data from Kafka, parses this json format to give me an output as shown below:



S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321


The output is basically a comma delimited string consisting of the sender, timestamp, messageType and id from within data.



My code so far is as below:



public class Pipeline1
public static void main(String args)
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)

.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));

p.run().waitUntilFinish();





I am unable to figure out how to parse the json to get the required csv format within the pipeline. Using the code above, I am able to write the same json lines into a file, and using the code below, i can parse the json, but can anyone please help me figure out how to accomplish this as an additional step with the beam pipeline logic?



JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();

JSONObject jsonObject = (JSONObject) obj;

String sender = (String) jsonObject.get("sender");

String messageType = (String) jsonObject.get("messageType");

String timestamp = (String) jsonObject.get("timestamp");

System.out.println(sender+","+timestamp+","+messageType);






java json apache-kafka apache-beam






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 15 '18 at 2:43









Utsav ChatterjeeUtsav Chatterjee

7410




7410












  • Two things here, you can use jsonObject.getString("timestamp"); instead of jsonObject.get("timestamp");.2) what do you mean by how to parse the json to get the required csv format within the pipeline so you want to save some attributes in JSON to csv file? can you show the example?

    – Deadpool
    Nov 15 '18 at 3:10











  • Kafka has a JSONDeserializer, by the way

    – cricket_007
    Nov 15 '18 at 3:15

















  • Two things here, you can use jsonObject.getString("timestamp"); instead of jsonObject.get("timestamp");.2) what do you mean by how to parse the json to get the required csv format within the pipeline so you want to save some attributes in JSON to csv file? can you show the example?

    – Deadpool
    Nov 15 '18 at 3:10











  • Kafka has a JSONDeserializer, by the way

    – cricket_007
    Nov 15 '18 at 3:15
















Two things here, you can use jsonObject.getString("timestamp"); instead of jsonObject.get("timestamp");.2) what do you mean by how to parse the json to get the required csv format within the pipeline so you want to save some attributes in JSON to csv file? can you show the example?

– Deadpool
Nov 15 '18 at 3:10





Two things here, you can use jsonObject.getString("timestamp"); instead of jsonObject.get("timestamp");.2) what do you mean by how to parse the json to get the required csv format within the pipeline so you want to save some attributes in JSON to csv file? can you show the example?

– Deadpool
Nov 15 '18 at 3:10













Kafka has a JSONDeserializer, by the way

– cricket_007
Nov 15 '18 at 3:15





Kafka has a JSONDeserializer, by the way

– cricket_007
Nov 15 '18 at 3:15












1 Answer
1






active

oldest

votes


















0














According to the documentation, you will need to write a transformation (or find one that matches your use case).



https://beam.apache.org/documentation/programming-guide/#composite-transforms



The documentation also provides an excellent example.



Example that should produce your output:



.apply(Values.<String>create())
.apply(
"JSONtoData", // the transform name
ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> out)
JSONParser parser = new JSONParser();
Object obj = null;
try
obj = parser.parse(strLine);
catch (ParseException e)
e.printStackTrace();

JSONObject jsonObject = (JSONObject) obj;

String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");

out.output(sender+","+timestamp+","+messageType);

));


To return CSV values, just change the generics to:



new DoFn<String, YourCSVClassHere>()
OutputReceiver<YourCSVClassHere> out


I didn't test this code, use at own risk.






share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311672%2fhow-to-parse-json-in-a-beam-pipeline%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    According to the documentation, you will need to write a transformation (or find one that matches your use case).



    https://beam.apache.org/documentation/programming-guide/#composite-transforms



    The documentation also provides an excellent example.



    Example that should produce your output:



    .apply(Values.<String>create())
    .apply(
    "JSONtoData", // the transform name
    ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
    @ProcessElement
    public void processElement(@Element String word, OutputReceiver<String> out)
    JSONParser parser = new JSONParser();
    Object obj = null;
    try
    obj = parser.parse(strLine);
    catch (ParseException e)
    e.printStackTrace();

    JSONObject jsonObject = (JSONObject) obj;

    String sender = (String) jsonObject.get("sender");
    String messageType = (String) jsonObject.get("messageType");
    String timestamp = (String) jsonObject.get("timestamp");

    out.output(sender+","+timestamp+","+messageType);

    ));


    To return CSV values, just change the generics to:



    new DoFn<String, YourCSVClassHere>()
    OutputReceiver<YourCSVClassHere> out


    I didn't test this code, use at own risk.






    share|improve this answer



























      0














      According to the documentation, you will need to write a transformation (or find one that matches your use case).



      https://beam.apache.org/documentation/programming-guide/#composite-transforms



      The documentation also provides an excellent example.



      Example that should produce your output:



      .apply(Values.<String>create())
      .apply(
      "JSONtoData", // the transform name
      ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
      @ProcessElement
      public void processElement(@Element String word, OutputReceiver<String> out)
      JSONParser parser = new JSONParser();
      Object obj = null;
      try
      obj = parser.parse(strLine);
      catch (ParseException e)
      e.printStackTrace();

      JSONObject jsonObject = (JSONObject) obj;

      String sender = (String) jsonObject.get("sender");
      String messageType = (String) jsonObject.get("messageType");
      String timestamp = (String) jsonObject.get("timestamp");

      out.output(sender+","+timestamp+","+messageType);

      ));


      To return CSV values, just change the generics to:



      new DoFn<String, YourCSVClassHere>()
      OutputReceiver<YourCSVClassHere> out


      I didn't test this code, use at own risk.






      share|improve this answer

























        0












        0








        0







        According to the documentation, you will need to write a transformation (or find one that matches your use case).



        https://beam.apache.org/documentation/programming-guide/#composite-transforms



        The documentation also provides an excellent example.



        Example that should produce your output:



        .apply(Values.<String>create())
        .apply(
        "JSONtoData", // the transform name
        ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
        @ProcessElement
        public void processElement(@Element String word, OutputReceiver<String> out)
        JSONParser parser = new JSONParser();
        Object obj = null;
        try
        obj = parser.parse(strLine);
        catch (ParseException e)
        e.printStackTrace();

        JSONObject jsonObject = (JSONObject) obj;

        String sender = (String) jsonObject.get("sender");
        String messageType = (String) jsonObject.get("messageType");
        String timestamp = (String) jsonObject.get("timestamp");

        out.output(sender+","+timestamp+","+messageType);

        ));


        To return CSV values, just change the generics to:



        new DoFn<String, YourCSVClassHere>()
        OutputReceiver<YourCSVClassHere> out


        I didn't test this code, use at own risk.






        share|improve this answer













        According to the documentation, you will need to write a transformation (or find one that matches your use case).



        https://beam.apache.org/documentation/programming-guide/#composite-transforms



        The documentation also provides an excellent example.



        Example that should produce your output:



        .apply(Values.<String>create())
        .apply(
        "JSONtoData", // the transform name
        ParDo.of(new DoFn<String, String>() // a DoFn as an anonymous inner class instance
        @ProcessElement
        public void processElement(@Element String word, OutputReceiver<String> out)
        JSONParser parser = new JSONParser();
        Object obj = null;
        try
        obj = parser.parse(strLine);
        catch (ParseException e)
        e.printStackTrace();

        JSONObject jsonObject = (JSONObject) obj;

        String sender = (String) jsonObject.get("sender");
        String messageType = (String) jsonObject.get("messageType");
        String timestamp = (String) jsonObject.get("timestamp");

        out.output(sender+","+timestamp+","+messageType);

        ));


        To return CSV values, just change the generics to:



        new DoFn<String, YourCSVClassHere>()
        OutputReceiver<YourCSVClassHere> out


        I didn't test this code, use at own risk.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 15 '18 at 3:13









        FriwiFriwi

        422212




        422212





























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311672%2fhow-to-parse-json-in-a-beam-pipeline%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Use pre created SQLite database for Android project in kotlin

            Darth Vader #20

            Ondo