Split a RDD into Multiple RDD based on value without doing `collect()` and `filter()` [duplicate]












1
















This question already has an answer here:




  • How do I split an RDD into two or more RDDs?

    4 answers




I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.



for e.g.



source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])


should be split into two RDDs with one containing only a and another containing only b as keys




  1. I have tried groupByKey method and able to do it successfully after doing a collect() operation on grouped RDD, which I cannot do in production due to memory constraints


a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()



  1. The current implementation is to apply multiple filter operation to get each RDD


a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')


Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?



Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.



Note: I would prefer pyspark implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.



I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.










share|improve this question















marked as duplicate by eliasah apache-spark
Users with the  apache-spark badge can single-handedly close apache-spark questions as duplicates and reopen them as needed.

StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;

$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');

$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19


This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.



















  • @eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.

    – Sumit Kumar
    Nov 26 '18 at 10:09











  • Person who downvoted the question can you please comment the reason, so either I can improve or defend my question

    – Sumit Kumar
    Nov 26 '18 at 11:50











  • If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.

    – Max Vollmer
    Nov 26 '18 at 22:39


















1
















This question already has an answer here:




  • How do I split an RDD into two or more RDDs?

    4 answers




I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.



for e.g.



source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])


should be split into two RDDs with one containing only a and another containing only b as keys




  1. I have tried groupByKey method and able to do it successfully after doing a collect() operation on grouped RDD, which I cannot do in production due to memory constraints


a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()



  1. The current implementation is to apply multiple filter operation to get each RDD


a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')


Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?



Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.



Note: I would prefer pyspark implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.



I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.










share|improve this question















marked as duplicate by eliasah apache-spark
Users with the  apache-spark badge can single-handedly close apache-spark questions as duplicates and reopen them as needed.

StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;

$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');

$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19


This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.



















  • @eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.

    – Sumit Kumar
    Nov 26 '18 at 10:09











  • Person who downvoted the question can you please comment the reason, so either I can improve or defend my question

    – Sumit Kumar
    Nov 26 '18 at 11:50











  • If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.

    – Max Vollmer
    Nov 26 '18 at 22:39
















1












1








1









This question already has an answer here:




  • How do I split an RDD into two or more RDDs?

    4 answers




I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.



for e.g.



source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])


should be split into two RDDs with one containing only a and another containing only b as keys




  1. I have tried groupByKey method and able to do it successfully after doing a collect() operation on grouped RDD, which I cannot do in production due to memory constraints


a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()



  1. The current implementation is to apply multiple filter operation to get each RDD


a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')


Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?



Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.



Note: I would prefer pyspark implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.



I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.










share|improve this question

















This question already has an answer here:




  • How do I split an RDD into two or more RDDs?

    4 answers




I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.



for e.g.



source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])


should be split into two RDDs with one containing only a and another containing only b as keys




  1. I have tried groupByKey method and able to do it successfully after doing a collect() operation on grouped RDD, which I cannot do in production due to memory constraints


a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()



  1. The current implementation is to apply multiple filter operation to get each RDD


a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')


Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?



Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.



Note: I would prefer pyspark implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.



I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.





This question already has an answer here:




  • How do I split an RDD into two or more RDDs?

    4 answers








apache-spark pyspark rdd amazon-emr






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 26 '18 at 10:30







Sumit Kumar

















asked Nov 26 '18 at 7:43









Sumit KumarSumit Kumar

1,4521526




1,4521526




marked as duplicate by eliasah apache-spark
Users with the  apache-spark badge can single-handedly close apache-spark questions as duplicates and reopen them as needed.

StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;

$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');

$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19


This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.









marked as duplicate by eliasah apache-spark
Users with the  apache-spark badge can single-handedly close apache-spark questions as duplicates and reopen them as needed.

StackExchange.ready(function() {
if (StackExchange.options.isMobile) return;

$('.dupe-hammer-message-hover:not(.hover-bound)').each(function() {
var $hover = $(this).addClass('hover-bound'),
$msg = $hover.siblings('.dupe-hammer-message');

$hover.hover(
function() {
$hover.showInfoMessage('', {
messageElement: $msg.clone().show(),
transient: false,
position: { my: 'bottom left', at: 'top center', offsetTop: -7 },
dismissable: false,
relativeToBody: true
});
},
function() {
StackExchange.helpers.removeMessages();
}
);
});
});
Nov 26 '18 at 9:19


This question has been asked before and already has an answer. If those answers do not fully address your question, please ask a new question.















  • @eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.

    – Sumit Kumar
    Nov 26 '18 at 10:09











  • Person who downvoted the question can you please comment the reason, so either I can improve or defend my question

    – Sumit Kumar
    Nov 26 '18 at 11:50











  • If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.

    – Max Vollmer
    Nov 26 '18 at 22:39





















  • @eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.

    – Sumit Kumar
    Nov 26 '18 at 10:09











  • Person who downvoted the question can you please comment the reason, so either I can improve or defend my question

    – Sumit Kumar
    Nov 26 '18 at 11:50











  • If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.

    – Max Vollmer
    Nov 26 '18 at 22:39



















@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.

– Sumit Kumar
Nov 26 '18 at 10:09





@eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.

– Sumit Kumar
Nov 26 '18 at 10:09













Person who downvoted the question can you please comment the reason, so either I can improve or defend my question

– Sumit Kumar
Nov 26 '18 at 11:50





Person who downvoted the question can you please comment the reason, so either I can improve or defend my question

– Sumit Kumar
Nov 26 '18 at 11:50













If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.

– Max Vollmer
Nov 26 '18 at 22:39







If the duplicate has outdated answers, don't ask a new question, instead you can place a bounty on the older question. Current answers are outdated is a valid and preselectable bounty reason. There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat.

– Max Vollmer
Nov 26 '18 at 22:39














1 Answer
1






active

oldest

votes


















0














You can using toDF too. Aslo, a_rdd and b_rdd are not rdd in your code as they are collected!



df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')





share|improve this answer


























  • This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid

    – Sumit Kumar
    Nov 26 '18 at 10:14











  • @SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).

    – OmG
    Nov 26 '18 at 10:24











  • Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body

    – Sumit Kumar
    Nov 26 '18 at 10:29











  • @SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.

    – OmG
    Nov 26 '18 at 10:31











  • collect is one of the methodology, I am already using second methodology, same as yours which does not use any collect or groupBy

    – Sumit Kumar
    Nov 26 '18 at 10:54




















1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









0














You can using toDF too. Aslo, a_rdd and b_rdd are not rdd in your code as they are collected!



df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')





share|improve this answer


























  • This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid

    – Sumit Kumar
    Nov 26 '18 at 10:14











  • @SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).

    – OmG
    Nov 26 '18 at 10:24











  • Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body

    – Sumit Kumar
    Nov 26 '18 at 10:29











  • @SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.

    – OmG
    Nov 26 '18 at 10:31











  • collect is one of the methodology, I am already using second methodology, same as yours which does not use any collect or groupBy

    – Sumit Kumar
    Nov 26 '18 at 10:54


















0














You can using toDF too. Aslo, a_rdd and b_rdd are not rdd in your code as they are collected!



df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')





share|improve this answer


























  • This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid

    – Sumit Kumar
    Nov 26 '18 at 10:14











  • @SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).

    – OmG
    Nov 26 '18 at 10:24











  • Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body

    – Sumit Kumar
    Nov 26 '18 at 10:29











  • @SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.

    – OmG
    Nov 26 '18 at 10:31











  • collect is one of the methodology, I am already using second methodology, same as yours which does not use any collect or groupBy

    – Sumit Kumar
    Nov 26 '18 at 10:54
















0












0








0







You can using toDF too. Aslo, a_rdd and b_rdd are not rdd in your code as they are collected!



df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')





share|improve this answer















You can using toDF too. Aslo, a_rdd and b_rdd are not rdd in your code as they are collected!



df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')






share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 26 '18 at 10:23

























answered Nov 26 '18 at 8:54









OmGOmG

8,46953047




8,46953047













  • This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid

    – Sumit Kumar
    Nov 26 '18 at 10:14











  • @SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).

    – OmG
    Nov 26 '18 at 10:24











  • Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body

    – Sumit Kumar
    Nov 26 '18 at 10:29











  • @SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.

    – OmG
    Nov 26 '18 at 10:31











  • collect is one of the methodology, I am already using second methodology, same as yours which does not use any collect or groupBy

    – Sumit Kumar
    Nov 26 '18 at 10:54





















  • This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid

    – Sumit Kumar
    Nov 26 '18 at 10:14











  • @SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).

    – OmG
    Nov 26 '18 at 10:24











  • Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body

    – Sumit Kumar
    Nov 26 '18 at 10:29











  • @SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.

    – OmG
    Nov 26 '18 at 10:31











  • collect is one of the methodology, I am already using second methodology, same as yours which does not use any collect or groupBy

    – Sumit Kumar
    Nov 26 '18 at 10:54



















This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid

– Sumit Kumar
Nov 26 '18 at 10:14





This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid

– Sumit Kumar
Nov 26 '18 at 10:14













@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).

– OmG
Nov 26 '18 at 10:24





@SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated).

– OmG
Nov 26 '18 at 10:24













Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body

– Sumit Kumar
Nov 26 '18 at 10:29





Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body

– Sumit Kumar
Nov 26 '18 at 10:29













@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.

– OmG
Nov 26 '18 at 10:31





@SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd.

– OmG
Nov 26 '18 at 10:31













collect is one of the methodology, I am already using second methodology, same as yours which does not use any collect or groupBy

– Sumit Kumar
Nov 26 '18 at 10:54







collect is one of the methodology, I am already using second methodology, same as yours which does not use any collect or groupBy

– Sumit Kumar
Nov 26 '18 at 10:54







Popular posts from this blog

Create new schema in PostgreSQL using DBeaver

Deepest pit of an array with Javascript: test on Codility

Fotorealismo