Split a RDD into Multiple RDD based on value without doing `collect()` and `filter()` [duplicate]
This question already has an answer here:
How do I split an RDD into two or more RDDs?
4 answers
I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.
for e.g.
source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])
should be split into two RDDs with one containing only a
and another containing only b
as keys
- I have tried
groupByKey
method and able to do it successfully after doing acollect()
operation on grouped RDD, which I cannot do in production due to memory constraints
a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()
- The current implementation is to apply multiple filter operation to get each RDD
a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')
Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?
Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.
Note: I would prefer pyspark
implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.
I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
apache-spark pyspark rdd amazon-emr
marked as duplicate by eliasah
StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;
$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');
$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19
This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.
add a comment |
This question already has an answer here:
How do I split an RDD into two or more RDDs?
4 answers
I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.
for e.g.
source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])
should be split into two RDDs with one containing only a
and another containing only b
as keys
- I have tried
groupByKey
method and able to do it successfully after doing acollect()
operation on grouped RDD, which I cannot do in production due to memory constraints
a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()
- The current implementation is to apply multiple filter operation to get each RDD
a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')
Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?
Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.
Note: I would prefer pyspark
implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.
I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
apache-spark pyspark rdd amazon-emr
marked as duplicate by eliasah
StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;
$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');
$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19
This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.
@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
– Sumit Kumar
Nov 26 '18 at 10:09
Person who downvoted the question can you please comment the reason, so either I can improve or defend my question
– Sumit Kumar
Nov 26 '18 at 11:50
If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.
– Max Vollmer
Nov 26 '18 at 22:39
add a comment |
This question already has an answer here:
How do I split an RDD into two or more RDDs?
4 answers
I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.
for e.g.
source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])
should be split into two RDDs with one containing only a
and another containing only b
as keys
- I have tried
groupByKey
method and able to do it successfully after doing acollect()
operation on grouped RDD, which I cannot do in production due to memory constraints
a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()
- The current implementation is to apply multiple filter operation to get each RDD
a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')
Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?
Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.
Note: I would prefer pyspark
implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.
I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
apache-spark pyspark rdd amazon-emr
This question already has an answer here:
How do I split an RDD into two or more RDDs?
4 answers
I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.
for e.g.
source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])
should be split into two RDDs with one containing only a
and another containing only b
as keys
- I have tried
groupByKey
method and able to do it successfully after doing acollect()
operation on grouped RDD, which I cannot do in production due to memory constraints
a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()
- The current implementation is to apply multiple filter operation to get each RDD
a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')
Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?
Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.
Note: I would prefer pyspark
implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.
I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
This question already has an answer here:
How do I split an RDD into two or more RDDs?
4 answers
apache-spark pyspark rdd amazon-emr
apache-spark pyspark rdd amazon-emr
edited Nov 26 '18 at 10:30
Sumit Kumar
asked Nov 26 '18 at 7:43
Sumit KumarSumit Kumar
1,4521526
1,4521526
marked as duplicate by eliasah
StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;
$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');
$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19
This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.
marked as duplicate by eliasah
StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;
$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');
$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19
This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.
@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
– Sumit Kumar
Nov 26 '18 at 10:09
Person who downvoted the question can you please comment the reason, so either I can improve or defend my question
– Sumit Kumar
Nov 26 '18 at 11:50
If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.
– Max Vollmer
Nov 26 '18 at 22:39
add a comment |
@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
– Sumit Kumar
Nov 26 '18 at 10:09
Person who downvoted the question can you please comment the reason, so either I can improve or defend my question
– Sumit Kumar
Nov 26 '18 at 11:50
If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.
– Max Vollmer
Nov 26 '18 at 22:39
@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
– Sumit Kumar
Nov 26 '18 at 10:09
@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
– Sumit Kumar
Nov 26 '18 at 10:09
Person who downvoted the question can you please comment the reason, so either I can improve or defend my question
– Sumit Kumar
Nov 26 '18 at 11:50
Person who downvoted the question can you please comment the reason, so either I can improve or defend my question
– Sumit Kumar
Nov 26 '18 at 11:50
If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.
– Max Vollmer
Nov 26 '18 at 22:39
If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.
– Max Vollmer
Nov 26 '18 at 22:39
add a comment |
1 Answer
1
active
oldest
votes
You can using toDF
too. Aslo, a_rdd
and b_rdd
are not rdd
in your code as they are collected!
df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')
This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid
– Sumit Kumar
Nov 26 '18 at 10:14
@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).
– OmG
Nov 26 '18 at 10:24
Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body
– Sumit Kumar
Nov 26 '18 at 10:29
@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.
– OmG
Nov 26 '18 at 10:31
collect is one of the methodology, I am already using second methodology, same as yours which does not use anycollect
orgroupBy
– Sumit Kumar
Nov 26 '18 at 10:54
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can using toDF
too. Aslo, a_rdd
and b_rdd
are not rdd
in your code as they are collected!
df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')
This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid
– Sumit Kumar
Nov 26 '18 at 10:14
@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).
– OmG
Nov 26 '18 at 10:24
Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body
– Sumit Kumar
Nov 26 '18 at 10:29
@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.
– OmG
Nov 26 '18 at 10:31
collect is one of the methodology, I am already using second methodology, same as yours which does not use anycollect
orgroupBy
– Sumit Kumar
Nov 26 '18 at 10:54
add a comment |
You can using toDF
too. Aslo, a_rdd
and b_rdd
are not rdd
in your code as they are collected!
df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')
This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid
– Sumit Kumar
Nov 26 '18 at 10:14
@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).
– OmG
Nov 26 '18 at 10:24
Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body
– Sumit Kumar
Nov 26 '18 at 10:29
@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.
– OmG
Nov 26 '18 at 10:31
collect is one of the methodology, I am already using second methodology, same as yours which does not use anycollect
orgroupBy
– Sumit Kumar
Nov 26 '18 at 10:54
add a comment |
You can using toDF
too. Aslo, a_rdd
and b_rdd
are not rdd
in your code as they are collected!
df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')
You can using toDF
too. Aslo, a_rdd
and b_rdd
are not rdd
in your code as they are collected!
df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')
edited Nov 26 '18 at 10:23
answered Nov 26 '18 at 8:54
OmGOmG
8,46953047
8,46953047
This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid
– Sumit Kumar
Nov 26 '18 at 10:14
@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).
– OmG
Nov 26 '18 at 10:24
Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body
– Sumit Kumar
Nov 26 '18 at 10:29
@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.
– OmG
Nov 26 '18 at 10:31
collect is one of the methodology, I am already using second methodology, same as yours which does not use anycollect
orgroupBy
– Sumit Kumar
Nov 26 '18 at 10:54
add a comment |
This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid
– Sumit Kumar
Nov 26 '18 at 10:14
@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).
– OmG
Nov 26 '18 at 10:24
Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body
– Sumit Kumar
Nov 26 '18 at 10:29
@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.
– OmG
Nov 26 '18 at 10:31
collect is one of the methodology, I am already using second methodology, same as yours which does not use anycollect
orgroupBy
– Sumit Kumar
Nov 26 '18 at 10:54
This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid
– Sumit Kumar
Nov 26 '18 at 10:14
This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid
– Sumit Kumar
Nov 26 '18 at 10:14
@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).
– OmG
Nov 26 '18 at 10:24
@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).
– OmG
Nov 26 '18 at 10:24
Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body
– Sumit Kumar
Nov 26 '18 at 10:29
Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body
– Sumit Kumar
Nov 26 '18 at 10:29
@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.
– OmG
Nov 26 '18 at 10:31
@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.
– OmG
Nov 26 '18 at 10:31
collect is one of the methodology, I am already using second methodology, same as yours which does not use any
collect
or groupBy
– Sumit Kumar
Nov 26 '18 at 10:54
collect is one of the methodology, I am already using second methodology, same as yours which does not use any
collect
or groupBy
– Sumit Kumar
Nov 26 '18 at 10:54
add a comment |
@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.
– Sumit Kumar
Nov 26 '18 at 10:09
Person who downvoted the question can you please comment the reason, so either I can improve or defend my question
– Sumit Kumar
Nov 26 '18 at 11:50
If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.
– Max Vollmer
Nov 26 '18 at 22:39