Handling schema mismatches in Spark










1















I am reading a csv file using Spark in Scala.
The schema is predefined and i am using it for reading.
This is the esample code:



// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))

// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate

// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)


From what i read when reading cav with Spark using a schema there are 3 options:



  1. Set mode to DROPMALFORMED --> this will drop the lines that don't match the schema

  2. Set mode to PERMISSIVE --> this will set the whole line to null values

  3. Set mode to FAILFAST --> this will throw an exception when a mismatch is discovered

What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
Basically, I want a combination of FAILFAST and DROPMALFORMED.



Thanks in advance










share|improve this question




























    1















    I am reading a csv file using Spark in Scala.
    The schema is predefined and i am using it for reading.
    This is the esample code:



    // create the schema
    val schema= StructType(Array(
    StructField("col1", IntegerType,false),
    StructField("col2", StringType,false),
    StructField("col3", StringType,true)))

    // Initialize Spark session
    val spark: SparkSession = SparkSession.builder
    .appName("Parquet Converter")
    .getOrCreate

    // Create a data frame from a csv file
    val dataFrame: DataFrame =
    spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)


    From what i read when reading cav with Spark using a schema there are 3 options:



    1. Set mode to DROPMALFORMED --> this will drop the lines that don't match the schema

    2. Set mode to PERMISSIVE --> this will set the whole line to null values

    3. Set mode to FAILFAST --> this will throw an exception when a mismatch is discovered

    What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
    Basically, I want a combination of FAILFAST and DROPMALFORMED.



    Thanks in advance










    share|improve this question


























      1












      1








      1








      I am reading a csv file using Spark in Scala.
      The schema is predefined and i am using it for reading.
      This is the esample code:



      // create the schema
      val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

      // Initialize Spark session
      val spark: SparkSession = SparkSession.builder
      .appName("Parquet Converter")
      .getOrCreate

      // Create a data frame from a csv file
      val dataFrame: DataFrame =
      spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)


      From what i read when reading cav with Spark using a schema there are 3 options:



      1. Set mode to DROPMALFORMED --> this will drop the lines that don't match the schema

      2. Set mode to PERMISSIVE --> this will set the whole line to null values

      3. Set mode to FAILFAST --> this will throw an exception when a mismatch is discovered

      What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
      Basically, I want a combination of FAILFAST and DROPMALFORMED.



      Thanks in advance










      share|improve this question
















      I am reading a csv file using Spark in Scala.
      The schema is predefined and i am using it for reading.
      This is the esample code:



      // create the schema
      val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

      // Initialize Spark session
      val spark: SparkSession = SparkSession.builder
      .appName("Parquet Converter")
      .getOrCreate

      // Create a data frame from a csv file
      val dataFrame: DataFrame =
      spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)


      From what i read when reading cav with Spark using a schema there are 3 options:



      1. Set mode to DROPMALFORMED --> this will drop the lines that don't match the schema

      2. Set mode to PERMISSIVE --> this will set the whole line to null values

      3. Set mode to FAILFAST --> this will throw an exception when a mismatch is discovered

      What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
      Basically, I want a combination of FAILFAST and DROPMALFORMED.



      Thanks in advance







      apache-spark






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 14 '18 at 10:48









      executable

      1,8902823




      1,8902823










      asked Nov 14 '18 at 8:55









      Ben HoffmanBen Hoffman

      2215




      2215






















          2 Answers
          2






          active

          oldest

          votes


















          0














          Just use DROPMALFORMED and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition option.





          spark.read.format("csv")
          .schema(schema)
          .option("header", false)
          .option("mode", "DROPMALFORMED")
          .option("maxMalformedLogPerPartition", 128)
          .load(inputCsvPath)





          share|improve this answer























          • where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?

            – vikrant rana
            Dec 13 '18 at 9:38


















          0














          This is what I eventually did:

          I added to the schema the "_corrupt_record" column, for example:



          val schema= StructType(Array(
          StructField("col1", IntegerType,true),
          StructField("col2", StringType,false),
          StructField("col3", StringType,true),
          StructField("_corrupt_record", StringType, true)))


          Then I read the CSV using PERMISSIVE mode (it is Spark default):



          val dataFrame: DataFrame = spark.read.format("csv")
          .schema(schema)
          .option("header", false)
          .option("mode", "PERMISSIVE")
          .load(inputCsvPath)


          Now my data frame holds an additional column that holds the rows with schema mismatches.
          I filtered the rows that have mismatched data and printed it:



          val badRows = dataFrame.filter("_corrupt_record is not null")
          badRows.cache()
          badRows.show()





          share|improve this answer
























            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53296257%2fhandling-schema-mismatches-in-spark%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            Just use DROPMALFORMED and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition option.





            spark.read.format("csv")
            .schema(schema)
            .option("header", false)
            .option("mode", "DROPMALFORMED")
            .option("maxMalformedLogPerPartition", 128)
            .load(inputCsvPath)





            share|improve this answer























            • where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?

              – vikrant rana
              Dec 13 '18 at 9:38















            0














            Just use DROPMALFORMED and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition option.





            spark.read.format("csv")
            .schema(schema)
            .option("header", false)
            .option("mode", "DROPMALFORMED")
            .option("maxMalformedLogPerPartition", 128)
            .load(inputCsvPath)





            share|improve this answer























            • where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?

              – vikrant rana
              Dec 13 '18 at 9:38













            0












            0








            0







            Just use DROPMALFORMED and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition option.





            spark.read.format("csv")
            .schema(schema)
            .option("header", false)
            .option("mode", "DROPMALFORMED")
            .option("maxMalformedLogPerPartition", 128)
            .load(inputCsvPath)





            share|improve this answer













            Just use DROPMALFORMED and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition option.





            spark.read.format("csv")
            .schema(schema)
            .option("header", false)
            .option("mode", "DROPMALFORMED")
            .option("maxMalformedLogPerPartition", 128)
            .load(inputCsvPath)






            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Nov 14 '18 at 9:33







            user10651087



















            • where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?

              – vikrant rana
              Dec 13 '18 at 9:38

















            • where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?

              – vikrant rana
              Dec 13 '18 at 9:38
















            where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?

            – vikrant rana
            Dec 13 '18 at 9:38





            where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?

            – vikrant rana
            Dec 13 '18 at 9:38













            0














            This is what I eventually did:

            I added to the schema the "_corrupt_record" column, for example:



            val schema= StructType(Array(
            StructField("col1", IntegerType,true),
            StructField("col2", StringType,false),
            StructField("col3", StringType,true),
            StructField("_corrupt_record", StringType, true)))


            Then I read the CSV using PERMISSIVE mode (it is Spark default):



            val dataFrame: DataFrame = spark.read.format("csv")
            .schema(schema)
            .option("header", false)
            .option("mode", "PERMISSIVE")
            .load(inputCsvPath)


            Now my data frame holds an additional column that holds the rows with schema mismatches.
            I filtered the rows that have mismatched data and printed it:



            val badRows = dataFrame.filter("_corrupt_record is not null")
            badRows.cache()
            badRows.show()





            share|improve this answer





























              0














              This is what I eventually did:

              I added to the schema the "_corrupt_record" column, for example:



              val schema= StructType(Array(
              StructField("col1", IntegerType,true),
              StructField("col2", StringType,false),
              StructField("col3", StringType,true),
              StructField("_corrupt_record", StringType, true)))


              Then I read the CSV using PERMISSIVE mode (it is Spark default):



              val dataFrame: DataFrame = spark.read.format("csv")
              .schema(schema)
              .option("header", false)
              .option("mode", "PERMISSIVE")
              .load(inputCsvPath)


              Now my data frame holds an additional column that holds the rows with schema mismatches.
              I filtered the rows that have mismatched data and printed it:



              val badRows = dataFrame.filter("_corrupt_record is not null")
              badRows.cache()
              badRows.show()





              share|improve this answer



























                0












                0








                0







                This is what I eventually did:

                I added to the schema the "_corrupt_record" column, for example:



                val schema= StructType(Array(
                StructField("col1", IntegerType,true),
                StructField("col2", StringType,false),
                StructField("col3", StringType,true),
                StructField("_corrupt_record", StringType, true)))


                Then I read the CSV using PERMISSIVE mode (it is Spark default):



                val dataFrame: DataFrame = spark.read.format("csv")
                .schema(schema)
                .option("header", false)
                .option("mode", "PERMISSIVE")
                .load(inputCsvPath)


                Now my data frame holds an additional column that holds the rows with schema mismatches.
                I filtered the rows that have mismatched data and printed it:



                val badRows = dataFrame.filter("_corrupt_record is not null")
                badRows.cache()
                badRows.show()





                share|improve this answer















                This is what I eventually did:

                I added to the schema the "_corrupt_record" column, for example:



                val schema= StructType(Array(
                StructField("col1", IntegerType,true),
                StructField("col2", StringType,false),
                StructField("col3", StringType,true),
                StructField("_corrupt_record", StringType, true)))


                Then I read the CSV using PERMISSIVE mode (it is Spark default):



                val dataFrame: DataFrame = spark.read.format("csv")
                .schema(schema)
                .option("header", false)
                .option("mode", "PERMISSIVE")
                .load(inputCsvPath)


                Now my data frame holds an additional column that holds the rows with schema mismatches.
                I filtered the rows that have mismatched data and printed it:



                val badRows = dataFrame.filter("_corrupt_record is not null")
                badRows.cache()
                badRows.show()






                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Feb 23 at 23:28









                rwp

                1,1992517




                1,1992517










                answered Dec 24 '18 at 8:42









                Ben HoffmanBen Hoffman

                2215




                2215



























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53296257%2fhandling-schema-mismatches-in-spark%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Use pre created SQLite database for Android project in kotlin

                    Darth Vader #20

                    Ondo