Handling schema mismatches in Spark
I am reading a csv file using Spark in Scala.
The schema is predefined and i am using it for reading.
This is the esample code:
// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))
// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate
// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
From what i read when reading cav with Spark using a schema there are 3 options:
- Set mode to
DROPMALFORMED
--> this will drop the lines that don't match the schema - Set mode to
PERMISSIVE
--> this will set the whole line to null values - Set mode to
FAILFAST
--> this will throw an exception when a mismatch is discovered
What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
Basically, I want a combination of FAILFAST and DROPMALFORMED.
Thanks in advance
apache-spark
add a comment |
I am reading a csv file using Spark in Scala.
The schema is predefined and i am using it for reading.
This is the esample code:
// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))
// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate
// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
From what i read when reading cav with Spark using a schema there are 3 options:
- Set mode to
DROPMALFORMED
--> this will drop the lines that don't match the schema - Set mode to
PERMISSIVE
--> this will set the whole line to null values - Set mode to
FAILFAST
--> this will throw an exception when a mismatch is discovered
What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
Basically, I want a combination of FAILFAST and DROPMALFORMED.
Thanks in advance
apache-spark
add a comment |
I am reading a csv file using Spark in Scala.
The schema is predefined and i am using it for reading.
This is the esample code:
// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))
// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate
// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
From what i read when reading cav with Spark using a schema there are 3 options:
- Set mode to
DROPMALFORMED
--> this will drop the lines that don't match the schema - Set mode to
PERMISSIVE
--> this will set the whole line to null values - Set mode to
FAILFAST
--> this will throw an exception when a mismatch is discovered
What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
Basically, I want a combination of FAILFAST and DROPMALFORMED.
Thanks in advance
apache-spark
I am reading a csv file using Spark in Scala.
The schema is predefined and i am using it for reading.
This is the esample code:
// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))
// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate
// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
From what i read when reading cav with Spark using a schema there are 3 options:
- Set mode to
DROPMALFORMED
--> this will drop the lines that don't match the schema - Set mode to
PERMISSIVE
--> this will set the whole line to null values - Set mode to
FAILFAST
--> this will throw an exception when a mismatch is discovered
What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame.
Basically, I want a combination of FAILFAST and DROPMALFORMED.
Thanks in advance
apache-spark
apache-spark
edited Nov 14 '18 at 10:48
executable
1,8902823
1,8902823
asked Nov 14 '18 at 8:55
Ben HoffmanBen Hoffman
2215
2215
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
Just use DROPMALFORMED
and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition
option.
spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "DROPMALFORMED")
.option("maxMalformedLogPerPartition", 128)
.load(inputCsvPath)
where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?
– vikrant rana
Dec 13 '18 at 9:38
add a comment |
This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Then I read the CSV using PERMISSIVE mode (it is Spark default):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Now my data frame holds an additional column that holds the rows with schema mismatches.
I filtered the rows that have mismatched data and printed it:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53296257%2fhandling-schema-mismatches-in-spark%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
Just use DROPMALFORMED
and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition
option.
spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "DROPMALFORMED")
.option("maxMalformedLogPerPartition", 128)
.load(inputCsvPath)
where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?
– vikrant rana
Dec 13 '18 at 9:38
add a comment |
Just use DROPMALFORMED
and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition
option.
spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "DROPMALFORMED")
.option("maxMalformedLogPerPartition", 128)
.load(inputCsvPath)
where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?
– vikrant rana
Dec 13 '18 at 9:38
add a comment |
Just use DROPMALFORMED
and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition
option.
spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "DROPMALFORMED")
.option("maxMalformedLogPerPartition", 128)
.load(inputCsvPath)
Just use DROPMALFORMED
and follow the log. If malformed records are present there are dumped to the log, up to the limit set by maxMalformedLogPerPartition
option.
spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "DROPMALFORMED")
.option("maxMalformedLogPerPartition", 128)
.load(inputCsvPath)
answered Nov 14 '18 at 9:33
user10651087
where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?
– vikrant rana
Dec 13 '18 at 9:38
add a comment |
where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?
– vikrant rana
Dec 13 '18 at 9:38
where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?
– vikrant rana
Dec 13 '18 at 9:38
where this malformed record gets dumped and stored at which location? Do we need to specify the location as well?
– vikrant rana
Dec 13 '18 at 9:38
add a comment |
This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Then I read the CSV using PERMISSIVE mode (it is Spark default):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Now my data frame holds an additional column that holds the rows with schema mismatches.
I filtered the rows that have mismatched data and printed it:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
add a comment |
This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Then I read the CSV using PERMISSIVE mode (it is Spark default):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Now my data frame holds an additional column that holds the rows with schema mismatches.
I filtered the rows that have mismatched data and printed it:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
add a comment |
This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Then I read the CSV using PERMISSIVE mode (it is Spark default):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Now my data frame holds an additional column that holds the rows with schema mismatches.
I filtered the rows that have mismatched data and printed it:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Then I read the CSV using PERMISSIVE mode (it is Spark default):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Now my data frame holds an additional column that holds the rows with schema mismatches.
I filtered the rows that have mismatched data and printed it:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
edited Feb 23 at 23:28
rwp
1,1992517
1,1992517
answered Dec 24 '18 at 8:42
Ben HoffmanBen Hoffman
2215
2215
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53296257%2fhandling-schema-mismatches-in-spark%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown