Spring Cloud Stream Reactive - How to do the Error Handling in case of reactive stream pipeline?
How to do the error handling for the reactive stream pipeline. Like
- Application Error Handling (ex: errorChannel)
- System Error Handling (working with DLQ, reprocessing etc)
The current documentation only describes error handling for the non-reactive pipeline.
https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling
Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:
- @StreamListner("errorChannel") annotation for the global error handling
- @KafkaListener(id="bar", topics = "reactive-stream-error-topic")
- Configs for the DLQ and producing failed messages to the error topics
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic
The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}
@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
inputFlux.subscribe(System.out::println);
throw new RuntimeException("BOOM!");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
System.out.println(in + " from DLQ");
}
}
spring-cloud-stream spring-kafka
add a comment |
How to do the error handling for the reactive stream pipeline. Like
- Application Error Handling (ex: errorChannel)
- System Error Handling (working with DLQ, reprocessing etc)
The current documentation only describes error handling for the non-reactive pipeline.
https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling
Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:
- @StreamListner("errorChannel") annotation for the global error handling
- @KafkaListener(id="bar", topics = "reactive-stream-error-topic")
- Configs for the DLQ and producing failed messages to the error topics
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic
The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}
@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
inputFlux.subscribe(System.out::println);
throw new RuntimeException("BOOM!");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
System.out.println(in + " from DLQ");
}
}
spring-cloud-stream spring-kafka
add a comment |
How to do the error handling for the reactive stream pipeline. Like
- Application Error Handling (ex: errorChannel)
- System Error Handling (working with DLQ, reprocessing etc)
The current documentation only describes error handling for the non-reactive pipeline.
https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling
Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:
- @StreamListner("errorChannel") annotation for the global error handling
- @KafkaListener(id="bar", topics = "reactive-stream-error-topic")
- Configs for the DLQ and producing failed messages to the error topics
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic
The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}
@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
inputFlux.subscribe(System.out::println);
throw new RuntimeException("BOOM!");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
System.out.println(in + " from DLQ");
}
}
spring-cloud-stream spring-kafka
How to do the error handling for the reactive stream pipeline. Like
- Application Error Handling (ex: errorChannel)
- System Error Handling (working with DLQ, reprocessing etc)
The current documentation only describes error handling for the non-reactive pipeline.
https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling
Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:
- @StreamListner("errorChannel") annotation for the global error handling
- @KafkaListener(id="bar", topics = "reactive-stream-error-topic")
- Configs for the DLQ and producing failed messages to the error topics
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic
The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}
@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
inputFlux.subscribe(System.out::println);
throw new RuntimeException("BOOM!");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
System.out.println(in + " from DLQ");
}
}
spring-cloud-stream spring-kafka
spring-cloud-stream spring-kafka
edited Nov 28 '18 at 10:04
Mansingh Shitole
asked Nov 22 '18 at 21:34
Mansingh ShitoleMansingh Shitole
315
315
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Sorry for the late reply.
First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive
method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .
That aside. . .
With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
So consider the following:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class,
"--spring.cloud.stream.function.definition=myconsumer");
}
@Bean
public Consumer<Flux<String>> myconsumer() {
return stream -> stream.subscribe(value -> {
if ("foo".equals(value)) {
throw new RuntimeException("BOOM!");
}
System.out.println("Received value: " + value);
});
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
}
Try that and let us know.
Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!
– Mansingh Shitole
Dec 5 '18 at 12:42
It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.
– Mansingh Shitole
Dec 6 '18 at 16:07
In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!
– Mansingh Shitole
Dec 6 '18 at 16:20
1
@MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.
– Oleg Zhurakousky
Dec 10 '18 at 17:04
Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.
– Mansingh Shitole
Dec 14 '18 at 15:02
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438208%2fspring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sorry for the late reply.
First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive
method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .
That aside. . .
With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
So consider the following:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class,
"--spring.cloud.stream.function.definition=myconsumer");
}
@Bean
public Consumer<Flux<String>> myconsumer() {
return stream -> stream.subscribe(value -> {
if ("foo".equals(value)) {
throw new RuntimeException("BOOM!");
}
System.out.println("Received value: " + value);
});
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
}
Try that and let us know.
Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!
– Mansingh Shitole
Dec 5 '18 at 12:42
It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.
– Mansingh Shitole
Dec 6 '18 at 16:07
In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!
– Mansingh Shitole
Dec 6 '18 at 16:20
1
@MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.
– Oleg Zhurakousky
Dec 10 '18 at 17:04
Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.
– Mansingh Shitole
Dec 14 '18 at 15:02
add a comment |
Sorry for the late reply.
First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive
method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .
That aside. . .
With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
So consider the following:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class,
"--spring.cloud.stream.function.definition=myconsumer");
}
@Bean
public Consumer<Flux<String>> myconsumer() {
return stream -> stream.subscribe(value -> {
if ("foo".equals(value)) {
throw new RuntimeException("BOOM!");
}
System.out.println("Received value: " + value);
});
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
}
Try that and let us know.
Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!
– Mansingh Shitole
Dec 5 '18 at 12:42
It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.
– Mansingh Shitole
Dec 6 '18 at 16:07
In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!
– Mansingh Shitole
Dec 6 '18 at 16:20
1
@MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.
– Oleg Zhurakousky
Dec 10 '18 at 17:04
Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.
– Mansingh Shitole
Dec 14 '18 at 15:02
add a comment |
Sorry for the late reply.
First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive
method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .
That aside. . .
With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
So consider the following:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class,
"--spring.cloud.stream.function.definition=myconsumer");
}
@Bean
public Consumer<Flux<String>> myconsumer() {
return stream -> stream.subscribe(value -> {
if ("foo".equals(value)) {
throw new RuntimeException("BOOM!");
}
System.out.println("Received value: " + value);
});
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
}
Try that and let us know.
Sorry for the late reply.
First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive
method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .
That aside. . .
With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
So consider the following:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class,
"--spring.cloud.stream.function.definition=myconsumer");
}
@Bean
public Consumer<Flux<String>> myconsumer() {
return stream -> stream.subscribe(value -> {
if ("foo".equals(value)) {
throw new RuntimeException("BOOM!");
}
System.out.println("Received value: " + value);
});
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
}
Try that and let us know.
answered Nov 29 '18 at 13:29
Oleg ZhurakouskyOleg Zhurakousky
1,59877
1,59877
Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!
– Mansingh Shitole
Dec 5 '18 at 12:42
It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.
– Mansingh Shitole
Dec 6 '18 at 16:07
In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!
– Mansingh Shitole
Dec 6 '18 at 16:20
1
@MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.
– Oleg Zhurakousky
Dec 10 '18 at 17:04
Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.
– Mansingh Shitole
Dec 14 '18 at 15:02
add a comment |
Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!
– Mansingh Shitole
Dec 5 '18 at 12:42
It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.
– Mansingh Shitole
Dec 6 '18 at 16:07
In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!
– Mansingh Shitole
Dec 6 '18 at 16:20
1
@MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.
– Oleg Zhurakousky
Dec 10 '18 at 17:04
Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.
– Mansingh Shitole
Dec 14 '18 at 15:02
Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!
– Mansingh Shitole
Dec 5 '18 at 12:42
Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!
– Mansingh Shitole
Dec 5 '18 at 12:42
It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.
– Mansingh Shitole
Dec 6 '18 at 16:07
It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.
– Mansingh Shitole
Dec 6 '18 at 16:07
In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!
– Mansingh Shitole
Dec 6 '18 at 16:20
In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!
– Mansingh Shitole
Dec 6 '18 at 16:20
1
1
@MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.
– Oleg Zhurakousky
Dec 10 '18 at 17:04
@MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.
– Oleg Zhurakousky
Dec 10 '18 at 17:04
Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.
– Mansingh Shitole
Dec 14 '18 at 15:02
Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.
– Mansingh Shitole
Dec 14 '18 at 15:02
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438208%2fspring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown