Writing a Custom Class to HDFS in Apache Flink










0














I am trying to get familiar with the semantics of Flink after having started with Spark. I would like to write a DataSet[IndexNode] to persistent storage in HDFS so that it can be read later by another process. Spark has a simple ObjectFile API that provides such a functionality, but I cannot find a similar option in Flink.



case class IndexNode(vec: Vector[IndexNode],
id: Int) extends Serializable
// Getters and setters etc. here



The build-in sinks tend to serialize the instance based on the toString method, which is not suitable here due to the nested structure of the class. I imagine the solution is to use a FileOutputFormat and provide a translation of the instances to a byte stream. However, I am not sure how to serialize the vector, which is of an arbitrary length and can be many levels deep.










share|improve this question


























    0














    I am trying to get familiar with the semantics of Flink after having started with Spark. I would like to write a DataSet[IndexNode] to persistent storage in HDFS so that it can be read later by another process. Spark has a simple ObjectFile API that provides such a functionality, but I cannot find a similar option in Flink.



    case class IndexNode(vec: Vector[IndexNode],
    id: Int) extends Serializable
    // Getters and setters etc. here



    The build-in sinks tend to serialize the instance based on the toString method, which is not suitable here due to the nested structure of the class. I imagine the solution is to use a FileOutputFormat and provide a translation of the instances to a byte stream. However, I am not sure how to serialize the vector, which is of an arbitrary length and can be many levels deep.










    share|improve this question
























      0












      0








      0







      I am trying to get familiar with the semantics of Flink after having started with Spark. I would like to write a DataSet[IndexNode] to persistent storage in HDFS so that it can be read later by another process. Spark has a simple ObjectFile API that provides such a functionality, but I cannot find a similar option in Flink.



      case class IndexNode(vec: Vector[IndexNode],
      id: Int) extends Serializable
      // Getters and setters etc. here



      The build-in sinks tend to serialize the instance based on the toString method, which is not suitable here due to the nested structure of the class. I imagine the solution is to use a FileOutputFormat and provide a translation of the instances to a byte stream. However, I am not sure how to serialize the vector, which is of an arbitrary length and can be many levels deep.










      share|improve this question













      I am trying to get familiar with the semantics of Flink after having started with Spark. I would like to write a DataSet[IndexNode] to persistent storage in HDFS so that it can be read later by another process. Spark has a simple ObjectFile API that provides such a functionality, but I cannot find a similar option in Flink.



      case class IndexNode(vec: Vector[IndexNode],
      id: Int) extends Serializable
      // Getters and setters etc. here



      The build-in sinks tend to serialize the instance based on the toString method, which is not suitable here due to the nested structure of the class. I imagine the solution is to use a FileOutputFormat and provide a translation of the instances to a byte stream. However, I am not sure how to serialize the vector, which is of an arbitrary length and can be many levels deep.







      scala apache-flink






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 11 '18 at 23:26









      Jeppeks1

      406




      406






















          1 Answer
          1






          active

          oldest

          votes


















          1














          You can achieve this by using SerializedOutputFormat and SerializedInputFormat.



          Try following steps:




          1. Make IndexNode extend IOReadableWritable interface from FLINK. Make unserialisable fields @transient. Implement write(DataOutputView out) and read(DataInputView in) method. The write method will write out all data from IndexNode and read method will read them back and build all internal data fields. For example, instead of serialising all data from arr field in Result class, I write out all value out and then read them back and rebuild the array in read method.



            class Result(var name: String, var count: Int) extends IOReadableWritable 

            @transient
            var arr = Array(count, count)

            def this()
            this("", 1)


            override def write(out: DataOutputView): Unit =
            out.writeInt(count)
            out.writeUTF(name)


            override def read(in: DataInputView): Unit =
            count = in.readInt()

            name = in.readUTF()

            arr = Array(count, count)


            override def toString: String = s"$name, $count, $getArr"





          2. Write out data with



            myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")


            and read it back with



            env.readFile(new SerializedInputFormat[Result], "/tmp/test")






          share|improve this answer




















          • Thanks, that does work for a simpler use case I had. However, I don't think the answer adresses how to handle the nested structure. Do I have to traverse through all the elements in the vector (and their vectors) and repeatedly write the elements?
            – Jeppeks1
            Nov 17 '18 at 14:59










          • If the Vector class provides some method to write to / read from string or bytes, then you do not have to traverse.
            – David
            Nov 18 '18 at 11:34










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53254281%2fwriting-a-custom-class-to-hdfs-in-apache-flink%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          You can achieve this by using SerializedOutputFormat and SerializedInputFormat.



          Try following steps:




          1. Make IndexNode extend IOReadableWritable interface from FLINK. Make unserialisable fields @transient. Implement write(DataOutputView out) and read(DataInputView in) method. The write method will write out all data from IndexNode and read method will read them back and build all internal data fields. For example, instead of serialising all data from arr field in Result class, I write out all value out and then read them back and rebuild the array in read method.



            class Result(var name: String, var count: Int) extends IOReadableWritable 

            @transient
            var arr = Array(count, count)

            def this()
            this("", 1)


            override def write(out: DataOutputView): Unit =
            out.writeInt(count)
            out.writeUTF(name)


            override def read(in: DataInputView): Unit =
            count = in.readInt()

            name = in.readUTF()

            arr = Array(count, count)


            override def toString: String = s"$name, $count, $getArr"





          2. Write out data with



            myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")


            and read it back with



            env.readFile(new SerializedInputFormat[Result], "/tmp/test")






          share|improve this answer




















          • Thanks, that does work for a simpler use case I had. However, I don't think the answer adresses how to handle the nested structure. Do I have to traverse through all the elements in the vector (and their vectors) and repeatedly write the elements?
            – Jeppeks1
            Nov 17 '18 at 14:59










          • If the Vector class provides some method to write to / read from string or bytes, then you do not have to traverse.
            – David
            Nov 18 '18 at 11:34















          1














          You can achieve this by using SerializedOutputFormat and SerializedInputFormat.



          Try following steps:




          1. Make IndexNode extend IOReadableWritable interface from FLINK. Make unserialisable fields @transient. Implement write(DataOutputView out) and read(DataInputView in) method. The write method will write out all data from IndexNode and read method will read them back and build all internal data fields. For example, instead of serialising all data from arr field in Result class, I write out all value out and then read them back and rebuild the array in read method.



            class Result(var name: String, var count: Int) extends IOReadableWritable 

            @transient
            var arr = Array(count, count)

            def this()
            this("", 1)


            override def write(out: DataOutputView): Unit =
            out.writeInt(count)
            out.writeUTF(name)


            override def read(in: DataInputView): Unit =
            count = in.readInt()

            name = in.readUTF()

            arr = Array(count, count)


            override def toString: String = s"$name, $count, $getArr"





          2. Write out data with



            myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")


            and read it back with



            env.readFile(new SerializedInputFormat[Result], "/tmp/test")






          share|improve this answer




















          • Thanks, that does work for a simpler use case I had. However, I don't think the answer adresses how to handle the nested structure. Do I have to traverse through all the elements in the vector (and their vectors) and repeatedly write the elements?
            – Jeppeks1
            Nov 17 '18 at 14:59










          • If the Vector class provides some method to write to / read from string or bytes, then you do not have to traverse.
            – David
            Nov 18 '18 at 11:34













          1












          1








          1






          You can achieve this by using SerializedOutputFormat and SerializedInputFormat.



          Try following steps:




          1. Make IndexNode extend IOReadableWritable interface from FLINK. Make unserialisable fields @transient. Implement write(DataOutputView out) and read(DataInputView in) method. The write method will write out all data from IndexNode and read method will read them back and build all internal data fields. For example, instead of serialising all data from arr field in Result class, I write out all value out and then read them back and rebuild the array in read method.



            class Result(var name: String, var count: Int) extends IOReadableWritable 

            @transient
            var arr = Array(count, count)

            def this()
            this("", 1)


            override def write(out: DataOutputView): Unit =
            out.writeInt(count)
            out.writeUTF(name)


            override def read(in: DataInputView): Unit =
            count = in.readInt()

            name = in.readUTF()

            arr = Array(count, count)


            override def toString: String = s"$name, $count, $getArr"





          2. Write out data with



            myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")


            and read it back with



            env.readFile(new SerializedInputFormat[Result], "/tmp/test")






          share|improve this answer












          You can achieve this by using SerializedOutputFormat and SerializedInputFormat.



          Try following steps:




          1. Make IndexNode extend IOReadableWritable interface from FLINK. Make unserialisable fields @transient. Implement write(DataOutputView out) and read(DataInputView in) method. The write method will write out all data from IndexNode and read method will read them back and build all internal data fields. For example, instead of serialising all data from arr field in Result class, I write out all value out and then read them back and rebuild the array in read method.



            class Result(var name: String, var count: Int) extends IOReadableWritable 

            @transient
            var arr = Array(count, count)

            def this()
            this("", 1)


            override def write(out: DataOutputView): Unit =
            out.writeInt(count)
            out.writeUTF(name)


            override def read(in: DataInputView): Unit =
            count = in.readInt()

            name = in.readUTF()

            arr = Array(count, count)


            override def toString: String = s"$name, $count, $getArr"





          2. Write out data with



            myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")


            and read it back with



            env.readFile(new SerializedInputFormat[Result], "/tmp/test")







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 13 '18 at 23:03









          David

          54839




          54839











          • Thanks, that does work for a simpler use case I had. However, I don't think the answer adresses how to handle the nested structure. Do I have to traverse through all the elements in the vector (and their vectors) and repeatedly write the elements?
            – Jeppeks1
            Nov 17 '18 at 14:59










          • If the Vector class provides some method to write to / read from string or bytes, then you do not have to traverse.
            – David
            Nov 18 '18 at 11:34
















          • Thanks, that does work for a simpler use case I had. However, I don't think the answer adresses how to handle the nested structure. Do I have to traverse through all the elements in the vector (and their vectors) and repeatedly write the elements?
            – Jeppeks1
            Nov 17 '18 at 14:59










          • If the Vector class provides some method to write to / read from string or bytes, then you do not have to traverse.
            – David
            Nov 18 '18 at 11:34















          Thanks, that does work for a simpler use case I had. However, I don't think the answer adresses how to handle the nested structure. Do I have to traverse through all the elements in the vector (and their vectors) and repeatedly write the elements?
          – Jeppeks1
          Nov 17 '18 at 14:59




          Thanks, that does work for a simpler use case I had. However, I don't think the answer adresses how to handle the nested structure. Do I have to traverse through all the elements in the vector (and their vectors) and repeatedly write the elements?
          – Jeppeks1
          Nov 17 '18 at 14:59












          If the Vector class provides some method to write to / read from string or bytes, then you do not have to traverse.
          – David
          Nov 18 '18 at 11:34




          If the Vector class provides some method to write to / read from string or bytes, then you do not have to traverse.
          – David
          Nov 18 '18 at 11:34

















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53254281%2fwriting-a-custom-class-to-hdfs-in-apache-flink%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          How to how show current date and time by default on contact form 7 in WordPress without taking input from user in datetimepicker

          Syphilis

          Darth Vader #20