Count number of users per window using PySpark
I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email
.
Then I use PySpark to count the number of unique users per window, using their email to identify them. The command
def print_users_count(count):
print 'The number of unique users is:', count
print_users_count((lambda message: message['email']).distinct().count())
Gives me the error below. How can I fix this?
AttributeError Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())
AttributeError: 'function' object has no attribute 'distinct'
Here is my PySpark code:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
try:
sc.stop()
except:
pass
sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})
# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
json apache-spark pyspark apache-kafka spark-streaming
add a comment |
I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email
.
Then I use PySpark to count the number of unique users per window, using their email to identify them. The command
def print_users_count(count):
print 'The number of unique users is:', count
print_users_count((lambda message: message['email']).distinct().count())
Gives me the error below. How can I fix this?
AttributeError Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())
AttributeError: 'function' object has no attribute 'distinct'
Here is my PySpark code:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
try:
sc.stop()
except:
pass
sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})
# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
json apache-spark pyspark apache-kafka spark-streaming
Could you please try this :print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25
Yepp! I gotAttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29
add a comment |
I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email
.
Then I use PySpark to count the number of unique users per window, using their email to identify them. The command
def print_users_count(count):
print 'The number of unique users is:', count
print_users_count((lambda message: message['email']).distinct().count())
Gives me the error below. How can I fix this?
AttributeError Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())
AttributeError: 'function' object has no attribute 'distinct'
Here is my PySpark code:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
try:
sc.stop()
except:
pass
sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})
# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
json apache-spark pyspark apache-kafka spark-streaming
I'm using Kafka to stream a JSON file, sending each line as a message. One of the keys is the user's email
.
Then I use PySpark to count the number of unique users per window, using their email to identify them. The command
def print_users_count(count):
print 'The number of unique users is:', count
print_users_count((lambda message: message['email']).distinct().count())
Gives me the error below. How can I fix this?
AttributeError Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())
AttributeError: 'function' object has no attribute 'distinct'
Here is my PySpark code:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
try:
sc.stop()
except:
pass
sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})
# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
json apache-spark pyspark apache-kafka spark-streaming
json apache-spark pyspark apache-kafka spark-streaming
edited Nov 20 at 20:19
cricket_007
79.2k1142109
79.2k1142109
asked Nov 20 at 16:26
albus_c
1,25932246
1,25932246
Could you please try this :print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25
Yepp! I gotAttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29
add a comment |
Could you please try this :print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25
Yepp! I gotAttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29
Could you please try this :
print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25
Could you please try this :
print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25
Yepp! I got
AttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29
Yepp! I got
AttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29
add a comment |
1 Answer
1
active
oldest
votes
Your not applying the lambda function to anything. What is message
referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'
. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email
is in.
See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols)
and pyspark.sql.functions.approx_count_distinct
pyspark docs. This should be a simpler solution to getting a unique count.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53397339%2fcount-number-of-users-per-window-using-pyspark%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Your not applying the lambda function to anything. What is message
referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'
. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email
is in.
See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols)
and pyspark.sql.functions.approx_count_distinct
pyspark docs. This should be a simpler solution to getting a unique count.
add a comment |
Your not applying the lambda function to anything. What is message
referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'
. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email
is in.
See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols)
and pyspark.sql.functions.approx_count_distinct
pyspark docs. This should be a simpler solution to getting a unique count.
add a comment |
Your not applying the lambda function to anything. What is message
referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'
. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email
is in.
See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols)
and pyspark.sql.functions.approx_count_distinct
pyspark docs. This should be a simpler solution to getting a unique count.
Your not applying the lambda function to anything. What is message
referencing? Right not the lambda function is just that, a function. That si why your getting AttributeError: 'function' object has no attribute 'distinct'
. It is not being applied to any data, so it is not returning any data. You need to reference the dataframe which the key email
is in.
See the pyspark docs for pyspark.sql.functions.countDistinct(col, *cols)
and pyspark.sql.functions.approx_count_distinct
pyspark docs. This should be a simpler solution to getting a unique count.
edited Nov 20 at 18:47
answered Nov 20 at 18:39
ash_huddles
394
394
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53397339%2fcount-number-of-users-per-window-using-pyspark%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Could you please try this :
print_users_count(parsed.map(lambda message: message['email']).distinct().count())
– Pavithran Ramachandran
Nov 20 at 18:25
Yepp! I got
AttributeError: 'TransformedDStream' object has no attribute 'distinct'
– albus_c
Nov 20 at 18:29