java.net.SocketTimeoutException: timeout while listening to meetup open_events streaming API

318 views
Skip to first unread message

DMJessieSP

unread,
Apr 6, 2017, 1:44:28 PM4/6/17
to Meetup API

I am trying to listen to an endless stream from open_events streaming API. I am using retrofit for this. For some reason I get a socketTimeoutException after (what seems to be) a random period of time. My code works for the meetups rsvps streaming API, but no others that I've tried (including open_venues, event_comments and photos). I have read the documentation for the different Streaming APIs, but they seem very similar. I cannot understand the reason for why it works for RSVPs and not any other APIs. It doesn't seem to have to do anything with whether the API uses long polling or chunked transfer encoding, since none of those works except for RSVPs. 


interface:


@GET("/2/open_events")
@Streaming
Observable<ResponseBody> meetupStream();


Code:


meetupService = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build()
.create(MeetupService.class);

meetupService.meetupStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);



public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {

while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
}

Doug Tangren

unread,
Apr 6, 2017, 1:48:35 PM4/6/17
to meetu...@googlegroups.com
On Thu, Apr 6, 2017 at 5:13 AM, DMJessieSP <sin4d...@hotmail.com> wrote:

I am trying to listen to an endless stream from open_events streaming API. I am using retrofit for this. For some reason I get a socketTimeoutException after (what seems to be) a random period of time. My code works for the meetups rsvps streaming API, but no others that I've tried (including open_venues, event_comments and photos). I have read the documentation for the different Streaming APIs, but they seem very similar. I cannot understand the reason for why it works for RSVPs and not any other APIs. It doesn't seem to have to do anything with whether the API uses long polling or chunked transfer encoding, since none of those works except for RSVPs. 



It's possible you'll need to adjust a timeout setting in your client. We don't push messages until they are available over the connection. It's likely that you're having better luck with rsvps because they are so frequent but event postings are much less frequent and, depending on what your client's timeout settings are, they are more likely to timeout

You can get a sense for how frequent these get pushed directly through curl.

 

--
--
You received this message because you are subscribed to the Google
Groups "Meetup API" group.
To unsubscribe from this group, send email to
meetup-api+unsubscribe@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/meetup-api?hl=en?hl=en

---
You received this message because you are subscribed to the Google Groups "Meetup API" group.
To unsubscribe from this group and stop receiving emails from it, send an email to meetup-api+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

DMJessieSP

unread,
Apr 13, 2017, 9:06:11 AM4/13/17
to Meetup API
Thank you for your help! This solved the sockettimeoutexception but I now got the EOFexception below instead after several hours of streaming. I know the stream didn't end as it is an endless stream and it isn't an internet connection issue as I tried disconnecting and it led to a java.net.SocketException: Software caused connection abort: recv failed. It happened after streaming open_events for about 4 hours. I tried streaming photos too to see the difference, and for photos it happened after only about 10-15 minutes. What could cause this?

Stacktrace: 

java.io.EOFException
at okio.RealBufferedSource.require(RealBufferedSource.java:64)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at okio.ForwardingSource.read(ForwardingSource.java:35)
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:159)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:153)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.subscribe(Observable.java:8759)
at rx.Observable.subscribe(Observable.java:8726)
at rx.Observable.subscribe(Observable.java:8549)
at com.example.meetup.MeetupListener.<init>(MeetupListener.java:144)
at com.example.meetup.MeetupApplication.main(MeetupApplication.java:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Exception in thread "main" rx.exceptions.OnErrorNotImplementedException
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:812)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:573)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:846)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:170)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:153)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.subscribe(Observable.java:8759)
at rx.Observable.subscribe(Observable.java:8726)
at rx.Observable.subscribe(Observable.java:8549)
at com.example.meetup.MeetupListener.<init>(MeetupListener.java:144)
at com.example.meetup.MeetupApplication.main(MeetupApplication.java:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.EOFException
at okio.RealBufferedSource.require(RealBufferedSource.java:64)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at okio.ForwardingSource.read(ForwardingSource.java:35)
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:159)
... 30 more

For more options, visit this group at
http://groups.google.com/group/meetup-api?hl=en?hl=en

---
You received this message because you are subscribed to the Google Groups "Meetup API" group.
To unsubscribe from this group and stop receiving emails from it, send an email to meetup-api+...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages