Spark Clustered By/Bucket by dataset not using memory












0














I recently came across Spark bucketby/clusteredby here.



I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:



myDf.repartition(20)
.write.partitionBy("day")
.option("mode", "DROPMALFORMED")
.option("compression", "snappy")
.option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")


On a different EMR cluster, I create a table and access it.



CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2  double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS


Create a scala dataframe and join it with another table of same 20 buckets of id column.



val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()


Here are my questions:




  1. I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?

  2. The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.

  3. What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.










share|improve this question





























    0














    I recently came across Spark bucketby/clusteredby here.



    I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:



    myDf.repartition(20)
    .write.partitionBy("day")
    .option("mode", "DROPMALFORMED")
    .option("compression", "snappy")
    .option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
    .format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")


    On a different EMR cluster, I create a table and access it.



    CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2  double) USING PARQUET OPTIONS (
    path "s3://my-bucket/folder/1year_data_bucketed/"
    )
    CLUSTERED BY (id) INTO 20 BUCKETS


    Create a scala dataframe and join it with another table of same 20 buckets of id column.



    val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
    val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
    val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
    joinedOutput.show()


    Here are my questions:




    1. I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?

    2. The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.

    3. What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.










    share|improve this question



























      0












      0








      0


      1





      I recently came across Spark bucketby/clusteredby here.



      I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:



      myDf.repartition(20)
      .write.partitionBy("day")
      .option("mode", "DROPMALFORMED")
      .option("compression", "snappy")
      .option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
      .format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")


      On a different EMR cluster, I create a table and access it.



      CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2  double) USING PARQUET OPTIONS (
      path "s3://my-bucket/folder/1year_data_bucketed/"
      )
      CLUSTERED BY (id) INTO 20 BUCKETS


      Create a scala dataframe and join it with another table of same 20 buckets of id column.



      val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
      val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
      val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
      joinedOutput.show()


      Here are my questions:




      1. I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?

      2. The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.

      3. What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.










      share|improve this question















      I recently came across Spark bucketby/clusteredby here.



      I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:



      myDf.repartition(20)
      .write.partitionBy("day")
      .option("mode", "DROPMALFORMED")
      .option("compression", "snappy")
      .option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
      .format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")


      On a different EMR cluster, I create a table and access it.



      CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2  double) USING PARQUET OPTIONS (
      path "s3://my-bucket/folder/1year_data_bucketed/"
      )
      CLUSTERED BY (id) INTO 20 BUCKETS


      Create a scala dataframe and join it with another table of same 20 buckets of id column.



      val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
      val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
      val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
      joinedOutput.show()


      Here are my questions:




      1. I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?

      2. The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.

      3. What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.







      apache-spark join amazon-s3 amazon-emr hadoop-partitioning






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 20 at 18:11

























      asked Nov 20 at 18:03









      androboy

      588720




      588720





























          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398930%2fspark-clustered-by-bucket-by-dataset-not-using-memory%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398930%2fspark-clustered-by-bucket-by-dataset-not-using-memory%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Costa Masnaga

          Fotorealismo

          Sidney Franklin