PySpark: Partitioning and hashing multiple dataframes, then joining












0















Background: I am working with clinical data with a lot of different .csv/.txt files. All these files are patientID based, but with different fields. I am importing these files into DataFrames, which I will join at a later stage after first processing each of these DataFrames individually. I have shown examples of two DataFrames below (df_A and df_B). Similarly, I have multiple DataFrames - df_A, df_B, df_C .... df_J and I will join all of them at a later stage.



df_A = spark.read.schema(schema).format("csv").load(...)....            # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None

Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None


After reading all these .csv/.txt files into DataFrames, I can see that for some DataFrames the data is distributed on just 1 partition (like above), but for others DataFrames, there could be more partitions, depending upon the size of the corresponding .csv/.txt file, which in turn influences number of blocks created (128 MB default size in HDFS). We also don't have a partitioner at the moment.



Question: Now, is it not a good idea to redistribute these DataFrames on multiple partitions, hashed on the basis of patientID, so that we can avoid as much shuffling as possible when we join() these multiple DataFrames? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner for all DataFrames(not sure if it's possible)? I have also read that DataFrame does everything on it's own, but should we not specify hashing according to column patientID?



I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames, all patientID based.










share|improve this question

























  • In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.

    – user6910411
    Nov 22 '18 at 14:12













  • Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.

    – cph_sto
    Nov 22 '18 at 14:28


















0















Background: I am working with clinical data with a lot of different .csv/.txt files. All these files are patientID based, but with different fields. I am importing these files into DataFrames, which I will join at a later stage after first processing each of these DataFrames individually. I have shown examples of two DataFrames below (df_A and df_B). Similarly, I have multiple DataFrames - df_A, df_B, df_C .... df_J and I will join all of them at a later stage.



df_A = spark.read.schema(schema).format("csv").load(...)....            # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None

Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None


After reading all these .csv/.txt files into DataFrames, I can see that for some DataFrames the data is distributed on just 1 partition (like above), but for others DataFrames, there could be more partitions, depending upon the size of the corresponding .csv/.txt file, which in turn influences number of blocks created (128 MB default size in HDFS). We also don't have a partitioner at the moment.



Question: Now, is it not a good idea to redistribute these DataFrames on multiple partitions, hashed on the basis of patientID, so that we can avoid as much shuffling as possible when we join() these multiple DataFrames? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner for all DataFrames(not sure if it's possible)? I have also read that DataFrame does everything on it's own, but should we not specify hashing according to column patientID?



I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames, all patientID based.










share|improve this question

























  • In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.

    – user6910411
    Nov 22 '18 at 14:12













  • Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.

    – cph_sto
    Nov 22 '18 at 14:28
















0












0








0








Background: I am working with clinical data with a lot of different .csv/.txt files. All these files are patientID based, but with different fields. I am importing these files into DataFrames, which I will join at a later stage after first processing each of these DataFrames individually. I have shown examples of two DataFrames below (df_A and df_B). Similarly, I have multiple DataFrames - df_A, df_B, df_C .... df_J and I will join all of them at a later stage.



df_A = spark.read.schema(schema).format("csv").load(...)....            # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None

Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None


After reading all these .csv/.txt files into DataFrames, I can see that for some DataFrames the data is distributed on just 1 partition (like above), but for others DataFrames, there could be more partitions, depending upon the size of the corresponding .csv/.txt file, which in turn influences number of blocks created (128 MB default size in HDFS). We also don't have a partitioner at the moment.



Question: Now, is it not a good idea to redistribute these DataFrames on multiple partitions, hashed on the basis of patientID, so that we can avoid as much shuffling as possible when we join() these multiple DataFrames? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner for all DataFrames(not sure if it's possible)? I have also read that DataFrame does everything on it's own, but should we not specify hashing according to column patientID?



I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames, all patientID based.










share|improve this question
















Background: I am working with clinical data with a lot of different .csv/.txt files. All these files are patientID based, but with different fields. I am importing these files into DataFrames, which I will join at a later stage after first processing each of these DataFrames individually. I have shown examples of two DataFrames below (df_A and df_B). Similarly, I have multiple DataFrames - df_A, df_B, df_C .... df_J and I will join all of them at a later stage.



df_A = spark.read.schema(schema).format("csv").load(...)....            # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None

Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None


After reading all these .csv/.txt files into DataFrames, I can see that for some DataFrames the data is distributed on just 1 partition (like above), but for others DataFrames, there could be more partitions, depending upon the size of the corresponding .csv/.txt file, which in turn influences number of blocks created (128 MB default size in HDFS). We also don't have a partitioner at the moment.



Question: Now, is it not a good idea to redistribute these DataFrames on multiple partitions, hashed on the basis of patientID, so that we can avoid as much shuffling as possible when we join() these multiple DataFrames? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner for all DataFrames(not sure if it's possible)? I have also read that DataFrame does everything on it's own, but should we not specify hashing according to column patientID?



I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames, all patientID based.







python apache-spark hash pyspark hadoop-partitioning






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 22 '18 at 14:32







cph_sto

















asked Nov 22 '18 at 13:24









cph_stocph_sto

1,722320




1,722320













  • In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.

    – user6910411
    Nov 22 '18 at 14:12













  • Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.

    – cph_sto
    Nov 22 '18 at 14:28





















  • In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.

    – user6910411
    Nov 22 '18 at 14:12













  • Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.

    – cph_sto
    Nov 22 '18 at 14:28



















In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.

– user6910411
Nov 22 '18 at 14:12







In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.

– user6910411
Nov 22 '18 at 14:12















Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.

– cph_sto
Nov 22 '18 at 14:28







Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.

– cph_sto
Nov 22 '18 at 14:28














0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53431989%2fpyspark-partitioning-and-hashing-multiple-dataframes-then-joining%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53431989%2fpyspark-partitioning-and-hashing-multiple-dataframes-then-joining%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Create new schema in PostgreSQL using DBeaver

Deepest pit of an array with Javascript: test on Codility

Costa Masnaga