Spark Clustered By/Bucket by dataset not using memory
I recently came across Spark bucketby/clusteredby here.
I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:
myDf.repartition(20)
.write.partitionBy("day")
.option("mode", "DROPMALFORMED")
.option("compression", "snappy")
.option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")
On a different EMR cluster, I create a table and access it.
CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2 double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS
Create a scala dataframe and join it with another table of same 20 buckets of id column.
val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()
Here are my questions:
- I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?
- The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.
- What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.
apache-spark join amazon-s3 amazon-emr hadoop-partitioning
add a comment |
I recently came across Spark bucketby/clusteredby here.
I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:
myDf.repartition(20)
.write.partitionBy("day")
.option("mode", "DROPMALFORMED")
.option("compression", "snappy")
.option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")
On a different EMR cluster, I create a table and access it.
CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2 double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS
Create a scala dataframe and join it with another table of same 20 buckets of id column.
val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()
Here are my questions:
- I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?
- The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.
- What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.
apache-spark join amazon-s3 amazon-emr hadoop-partitioning
add a comment |
I recently came across Spark bucketby/clusteredby here.
I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:
myDf.repartition(20)
.write.partitionBy("day")
.option("mode", "DROPMALFORMED")
.option("compression", "snappy")
.option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")
On a different EMR cluster, I create a table and access it.
CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2 double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS
Create a scala dataframe and join it with another table of same 20 buckets of id column.
val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()
Here are my questions:
- I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?
- The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.
- What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.
apache-spark join amazon-s3 amazon-emr hadoop-partitioning
I recently came across Spark bucketby/clusteredby here.
I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:
myDf.repartition(20)
.write.partitionBy("day")
.option("mode", "DROPMALFORMED")
.option("compression", "snappy")
.option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")
On a different EMR cluster, I create a table and access it.
CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2 double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS
Create a scala dataframe and join it with another table of same 20 buckets of id column.
val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()
Here are my questions:
- I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?
- The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.
- What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.
apache-spark join amazon-s3 amazon-emr hadoop-partitioning
apache-spark join amazon-s3 amazon-emr hadoop-partitioning
edited Nov 20 at 18:11
asked Nov 20 at 18:03
androboy
588720
588720
add a comment |
add a comment |
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398930%2fspark-clustered-by-bucket-by-dataset-not-using-memory%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398930%2fspark-clustered-by-bucket-by-dataset-not-using-memory%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown