Spark SQL - loading csv/psv files with some malformed records










2















We are loading hierarchies of directories of files with Spark and converting them to Parquet. There are tens of gigabytes in hundreds of pipe-separated files. Some are pretty big themselves.



Every, say, 100th file has a row or two that has an extra delimiter that makes the whole process (or the file) abort.



We are loading using:



sqlContext.read
.format("com.databricks.spark.csv")
.option("header", format("header"))
.option("delimiter", format("delimeter"))
.option("quote", format("quote"))
.option("escape", format("escape"))
.option("charset", "UTF-8")
// Column types are unnecessary for our current use cases.
//.option("inferschema", "true")
.load(glob)


Is there any extension or a event handling mechanism with Spark that we could attach to the logic that reads rows, that, if the malformed row is encountered, just skips the row instead of failing the process on it?



(We are planning to do more pre-processing, but this would be the most immediate and critical fix.)










share|improve this question




























    2















    We are loading hierarchies of directories of files with Spark and converting them to Parquet. There are tens of gigabytes in hundreds of pipe-separated files. Some are pretty big themselves.



    Every, say, 100th file has a row or two that has an extra delimiter that makes the whole process (or the file) abort.



    We are loading using:



    sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", format("header"))
    .option("delimiter", format("delimeter"))
    .option("quote", format("quote"))
    .option("escape", format("escape"))
    .option("charset", "UTF-8")
    // Column types are unnecessary for our current use cases.
    //.option("inferschema", "true")
    .load(glob)


    Is there any extension or a event handling mechanism with Spark that we could attach to the logic that reads rows, that, if the malformed row is encountered, just skips the row instead of failing the process on it?



    (We are planning to do more pre-processing, but this would be the most immediate and critical fix.)










    share|improve this question


























      2












      2








      2


      1






      We are loading hierarchies of directories of files with Spark and converting them to Parquet. There are tens of gigabytes in hundreds of pipe-separated files. Some are pretty big themselves.



      Every, say, 100th file has a row or two that has an extra delimiter that makes the whole process (or the file) abort.



      We are loading using:



      sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", format("header"))
      .option("delimiter", format("delimeter"))
      .option("quote", format("quote"))
      .option("escape", format("escape"))
      .option("charset", "UTF-8")
      // Column types are unnecessary for our current use cases.
      //.option("inferschema", "true")
      .load(glob)


      Is there any extension or a event handling mechanism with Spark that we could attach to the logic that reads rows, that, if the malformed row is encountered, just skips the row instead of failing the process on it?



      (We are planning to do more pre-processing, but this would be the most immediate and critical fix.)










      share|improve this question
















      We are loading hierarchies of directories of files with Spark and converting them to Parquet. There are tens of gigabytes in hundreds of pipe-separated files. Some are pretty big themselves.



      Every, say, 100th file has a row or two that has an extra delimiter that makes the whole process (or the file) abort.



      We are loading using:



      sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", format("header"))
      .option("delimiter", format("delimeter"))
      .option("quote", format("quote"))
      .option("escape", format("escape"))
      .option("charset", "UTF-8")
      // Column types are unnecessary for our current use cases.
      //.option("inferschema", "true")
      .load(glob)


      Is there any extension or a event handling mechanism with Spark that we could attach to the logic that reads rows, that, if the malformed row is encountered, just skips the row instead of failing the process on it?



      (We are planning to do more pre-processing, but this would be the most immediate and critical fix.)







      csv apache-spark apache-spark-sql parquet






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Dec 18 '15 at 2:10







      Edmon

















      asked Dec 18 '15 at 2:04









      EdmonEdmon

      3,26921934




      3,26921934






















          1 Answer
          1






          active

          oldest

          votes


















          8














          In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.



          You should be able to simply add the option:



          .option("mode", "DROPMALFORMED")


          like this:



          sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", format("header"))
          .option("delimiter", format("delimeter"))
          .option("quote", format("quote"))
          .option("escape", format("escape"))
          .option("charset", "UTF-8")
          // Column types are unnecessary for our current use cases.
          //.option("inferschema", "true")
          .option("mode", "DROPMALFORMED")
          .load(glob)


          and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.






          share|improve this answer

























          • One caveat is that you silently drop data from your input data

            – ski_squaw
            May 11 '18 at 20:55










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34347448%2fspark-sql-loading-csv-psv-files-with-some-malformed-records%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          8














          In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.



          You should be able to simply add the option:



          .option("mode", "DROPMALFORMED")


          like this:



          sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", format("header"))
          .option("delimiter", format("delimeter"))
          .option("quote", format("quote"))
          .option("escape", format("escape"))
          .option("charset", "UTF-8")
          // Column types are unnecessary for our current use cases.
          //.option("inferschema", "true")
          .option("mode", "DROPMALFORMED")
          .load(glob)


          and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.






          share|improve this answer

























          • One caveat is that you silently drop data from your input data

            – ski_squaw
            May 11 '18 at 20:55















          8














          In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.



          You should be able to simply add the option:



          .option("mode", "DROPMALFORMED")


          like this:



          sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", format("header"))
          .option("delimiter", format("delimeter"))
          .option("quote", format("quote"))
          .option("escape", format("escape"))
          .option("charset", "UTF-8")
          // Column types are unnecessary for our current use cases.
          //.option("inferschema", "true")
          .option("mode", "DROPMALFORMED")
          .load(glob)


          and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.






          share|improve this answer

























          • One caveat is that you silently drop data from your input data

            – ski_squaw
            May 11 '18 at 20:55













          8












          8








          8







          In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.



          You should be able to simply add the option:



          .option("mode", "DROPMALFORMED")


          like this:



          sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", format("header"))
          .option("delimiter", format("delimeter"))
          .option("quote", format("quote"))
          .option("escape", format("escape"))
          .option("charset", "UTF-8")
          // Column types are unnecessary for our current use cases.
          //.option("inferschema", "true")
          .option("mode", "DROPMALFORMED")
          .load(glob)


          and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.






          share|improve this answer















          In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.



          You should be able to simply add the option:



          .option("mode", "DROPMALFORMED")


          like this:



          sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", format("header"))
          .option("delimiter", format("delimeter"))
          .option("quote", format("quote"))
          .option("escape", format("escape"))
          .option("charset", "UTF-8")
          // Column types are unnecessary for our current use cases.
          //.option("inferschema", "true")
          .option("mode", "DROPMALFORMED")
          .load(glob)


          and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 12 '18 at 16:11









          user6910411

          33.1k97398




          33.1k97398










          answered Dec 18 '15 at 2:27









          Dennis HuoDennis Huo

          6,9241334




          6,9241334












          • One caveat is that you silently drop data from your input data

            – ski_squaw
            May 11 '18 at 20:55

















          • One caveat is that you silently drop data from your input data

            – ski_squaw
            May 11 '18 at 20:55
















          One caveat is that you silently drop data from your input data

          – ski_squaw
          May 11 '18 at 20:55





          One caveat is that you silently drop data from your input data

          – ski_squaw
          May 11 '18 at 20:55

















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34347448%2fspark-sql-loading-csv-psv-files-with-some-malformed-records%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Darth Vader #20

          How to how show current date and time by default on contact form 7 in WordPress without taking input from user in datetimepicker

          Ondo