Count number of users per window using PySpark












0














I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email.



Then I use PySpark to count the number of unique users per window, using their email to identify them. The command



def print_users_count(count):
print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())


Gives me the error below. How can I fix this?



AttributeError                            Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'


Here is my PySpark code:



from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:
sc.stop()
except:
pass

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()









share|improve this question
























  • Could you please try this : print_users_count(parsed.map(lambda message: message['email']).distinct().count())
    – Pavithran Ramachandran
    Nov 20 at 18:25












  • Yepp! I got AttributeError: 'TransformedDStream' object has no attribute 'distinct'
    – albus_c
    Nov 20 at 18:29
















0














I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email.



Then I use PySpark to count the number of unique users per window, using their email to identify them. The command



def print_users_count(count):
print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())


Gives me the error below. How can I fix this?



AttributeError                            Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'


Here is my PySpark code:



from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:
sc.stop()
except:
pass

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()









share|improve this question
























  • Could you please try this : print_users_count(parsed.map(lambda message: message['email']).distinct().count())
    – Pavithran Ramachandran
    Nov 20 at 18:25












  • Yepp! I got AttributeError: 'TransformedDStream' object has no attribute 'distinct'
    – albus_c
    Nov 20 at 18:29














0












0








0







I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email.



Then I use PySpark to count the number of unique users per window, using their email to identify them. The command



def print_users_count(count):
print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())


Gives me the error below. How can I fix this?



AttributeError                            Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'


Here is my PySpark code:



from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:
sc.stop()
except:
pass

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()









share|improve this question















I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email.



Then I use PySpark to count the number of unique users per window, using their email to identify them. The command



def print_users_count(count):
print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())


Gives me the error below. How can I fix this?



AttributeError                            Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'


Here is my PySpark code:



from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:
sc.stop()
except:
pass

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()






json apache-spark pyspark apache-kafka spark-streaming






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 20 at 20:19









cricket_007

79.2k1142109




79.2k1142109










asked Nov 20 at 16:26









albus_c

1,25932246




1,25932246












  • Could you please try this : print_users_count(parsed.map(lambda message: message['email']).distinct().count())
    – Pavithran Ramachandran
    Nov 20 at 18:25












  • Yepp! I got AttributeError: 'TransformedDStream' object has no attribute 'distinct'
    – albus_c
    Nov 20 at 18:29


















  • Could you please try this : print_users_count(parsed.map(lambda message: message['email']).distinct().count())
    – Pavithran Ramachandran
    Nov 20 at 18:25












  • Yepp! I got AttributeError: 'TransformedDStream' object has no attribute 'distinct'
    – albus_c
    Nov 20 at 18:29
















Could you please try this : print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25






Could you please try this : print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25














Yepp! I got AttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29




Yepp! I got AttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29












1 Answer
1






active

oldest

votes


















-1














Your not applying the lambda function to anything. What is message referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email is in.



See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols) and pyspark.sql.functions.approx_count_distinct pyspark docs. This should be a simpler solution to getting a unique count.






share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53397339%2fcount-number-of-users-per-window-using-pyspark%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    -1














    Your not applying the lambda function to anything. What is message referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email is in.



    See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols) and pyspark.sql.functions.approx_count_distinct pyspark docs. This should be a simpler solution to getting a unique count.






    share|improve this answer




























      -1














      Your not applying the lambda function to anything. What is message referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email is in.



      See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols) and pyspark.sql.functions.approx_count_distinct pyspark docs. This should be a simpler solution to getting a unique count.






      share|improve this answer


























        -1












        -1








        -1






        Your not applying the lambda function to anything. What is message referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email is in.



        See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols) and pyspark.sql.functions.approx_count_distinct pyspark docs. This should be a simpler solution to getting a unique count.






        share|improve this answer














        Your not applying the lambda function to anything. What is message referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email is in.



        See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols) and pyspark.sql.functions.approx_count_distinct pyspark docs. This should be a simpler solution to getting a unique count.







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Nov 20 at 18:47

























        answered Nov 20 at 18:39









        ash_huddles

        394




        394






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.





            Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


            Please pay close attention to the following guidance:


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53397339%2fcount-number-of-users-per-window-using-pyspark%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Costa Masnaga

            Fotorealismo

            Sidney Franklin