PySpark: Partitioning and hashing multiple dataframes, then joining
Background: I am working with clinical data with a lot of different .csv/.txt
files. All these files are patientID based, but with different fields. I am importing these files into DataFrames
, which I will join
at a later stage after first processing each of these DataFrames
individually. I have shown examples of two DataFrames
below (df_A
and df_B
). Similarly, I have multiple DataFrames
- df_A
, df_B
, df_C
.... df_J
and I will join
all of them at a later stage.
df_A = spark.read.schema(schema).format("csv").load(...).... # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None
Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None
After reading all these .csv/.txt
files into DataFrames
, I can see that for some DataFrames
the data is distributed on just 1 partition
(like above), but for others DataFrames
, there could be more partitions, depending upon the size of the corresponding .csv/.txt
file, which in turn influences number of blocks created (128 MB default size in HDFS
). We also don't have a partitioner
at the moment.
Question: Now, is it not a good idea to redistribute these DataFrames
on multiple partitions
, hashed
on the basis of patientID, so that we can avoid as much shuffling
as possible when we join()
these multiple DataFrames
? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner
for all DataFrames
(not sure if it's possible)? I have also read that DataFrame
does everything on it's own, but should we not specify hashing
according to column patientID?
I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames
, all patientID based.
python apache-spark hash pyspark hadoop-partitioning
add a comment |
Background: I am working with clinical data with a lot of different .csv/.txt
files. All these files are patientID based, but with different fields. I am importing these files into DataFrames
, which I will join
at a later stage after first processing each of these DataFrames
individually. I have shown examples of two DataFrames
below (df_A
and df_B
). Similarly, I have multiple DataFrames
- df_A
, df_B
, df_C
.... df_J
and I will join
all of them at a later stage.
df_A = spark.read.schema(schema).format("csv").load(...).... # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None
Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None
After reading all these .csv/.txt
files into DataFrames
, I can see that for some DataFrames
the data is distributed on just 1 partition
(like above), but for others DataFrames
, there could be more partitions, depending upon the size of the corresponding .csv/.txt
file, which in turn influences number of blocks created (128 MB default size in HDFS
). We also don't have a partitioner
at the moment.
Question: Now, is it not a good idea to redistribute these DataFrames
on multiple partitions
, hashed
on the basis of patientID, so that we can avoid as much shuffling
as possible when we join()
these multiple DataFrames
? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner
for all DataFrames
(not sure if it's possible)? I have also read that DataFrame
does everything on it's own, but should we not specify hashing
according to column patientID?
I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames
, all patientID based.
python apache-spark hash pyspark hadoop-partitioning
In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.
– user6910411
Nov 22 '18 at 14:12
Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.
– cph_sto
Nov 22 '18 at 14:28
add a comment |
Background: I am working with clinical data with a lot of different .csv/.txt
files. All these files are patientID based, but with different fields. I am importing these files into DataFrames
, which I will join
at a later stage after first processing each of these DataFrames
individually. I have shown examples of two DataFrames
below (df_A
and df_B
). Similarly, I have multiple DataFrames
- df_A
, df_B
, df_C
.... df_J
and I will join
all of them at a later stage.
df_A = spark.read.schema(schema).format("csv").load(...).... # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None
Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None
After reading all these .csv/.txt
files into DataFrames
, I can see that for some DataFrames
the data is distributed on just 1 partition
(like above), but for others DataFrames
, there could be more partitions, depending upon the size of the corresponding .csv/.txt
file, which in turn influences number of blocks created (128 MB default size in HDFS
). We also don't have a partitioner
at the moment.
Question: Now, is it not a good idea to redistribute these DataFrames
on multiple partitions
, hashed
on the basis of patientID, so that we can avoid as much shuffling
as possible when we join()
these multiple DataFrames
? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner
for all DataFrames
(not sure if it's possible)? I have also read that DataFrame
does everything on it's own, but should we not specify hashing
according to column patientID?
I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames
, all patientID based.
python apache-spark hash pyspark hadoop-partitioning
Background: I am working with clinical data with a lot of different .csv/.txt
files. All these files are patientID based, but with different fields. I am importing these files into DataFrames
, which I will join
at a later stage after first processing each of these DataFrames
individually. I have shown examples of two DataFrames
below (df_A
and df_B
). Similarly, I have multiple DataFrames
- df_A
, df_B
, df_C
.... df_J
and I will join
all of them at a later stage.
df_A = spark.read.schema(schema).format("csv").load(...).... # Just an example
df_A.show(3)
#Example 1:
+----------+-----------------+
| patientID| diagnosis_code|
+----------+-----------------+
| A51| XIII|
| B22| VI|
| B13| XV|
+----------+-----------------+
df_B.show(3)
#Example 2:
+-----------+----------+-------+-------------+--------+
| patientID| hospital| city| doctor_name| Bill|
+-----------+----------+-------+-------------+--------+
| A51| Royal H| London|C.Braithwaite| 451.23|
| B22|Surgery K.| Leeds| J.Small| 88.00|
| B22|Surgery K.| Leeds| J.Small| 102.01|
+-----------+----------+-------+-------------+--------+
print("Number of partitions: {}".format(df_A.rdd.getNumPartitions()))# Num of partitions: 1
print("Partitioner: {}".format(df_A.rdd.partitioner)) # Partitioner: None
Number of partitions: 1 #With other DataFrames I get more partitions.
Partitioner: None
After reading all these .csv/.txt
files into DataFrames
, I can see that for some DataFrames
the data is distributed on just 1 partition
(like above), but for others DataFrames
, there could be more partitions, depending upon the size of the corresponding .csv/.txt
file, which in turn influences number of blocks created (128 MB default size in HDFS
). We also don't have a partitioner
at the moment.
Question: Now, is it not a good idea to redistribute these DataFrames
on multiple partitions
, hashed
on the basis of patientID, so that we can avoid as much shuffling
as possible when we join()
these multiple DataFrames
? If indeed, this is what is desired, then should I do repartitioning on patientID basis and have a same partitioner
for all DataFrames
(not sure if it's possible)? I have also read that DataFrame
does everything on it's own, but should we not specify hashing
according to column patientID?
I will really appreciate if someone can provide some useful links or cues on what optimization strategy should one employ when dealing with these multiple DataFrames
, all patientID based.
python apache-spark hash pyspark hadoop-partitioning
python apache-spark hash pyspark hadoop-partitioning
edited Nov 22 '18 at 14:32
cph_sto
asked Nov 22 '18 at 13:24
cph_stocph_sto
1,722320
1,722320
In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.
– user6910411
Nov 22 '18 at 14:12
Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.
– cph_sto
Nov 22 '18 at 14:28
add a comment |
In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.
– user6910411
Nov 22 '18 at 14:12
Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.
– cph_sto
Nov 22 '18 at 14:28
In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.
– user6910411
Nov 22 '18 at 14:12
In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.
– user6910411
Nov 22 '18 at 14:12
Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.
– cph_sto
Nov 22 '18 at 14:28
Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.
– cph_sto
Nov 22 '18 at 14:28
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53431989%2fpyspark-partitioning-and-hashing-multiple-dataframes-then-joining%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53431989%2fpyspark-partitioning-and-hashing-multiple-dataframes-then-joining%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
In general if dataframe is small enough to occupy a single partition it is also small enough to be automatically broadcasted, so the problem is non existent. And in broader sense repartition is shuffle, so if it is one off operation, it is a waste of time.
– user6910411
Nov 22 '18 at 14:12
Well, I have many dataframes and some of them are very big, occupying some 50 partitions (number of my cores) by default. So, when I will join them, I will have a lot of shuffling to do on partitions if all the dataframes are not having same partitioner. That's why I am thinking to partition all dataframes, on let's say 50 partitions, and then use same hashing function to partition (don't know if it's possible) to partition on all dataframes. If you have any suggestion, that may be very helpful.
– cph_sto
Nov 22 '18 at 14:28