How to use PHP to send JSON data to kafka?

713 views
Skip to first unread message

Shrinivasan T

unread,
Jun 14, 2016, 3:21:17 PM6/14/16
to Confluent Platform
I am trying to send the following json from my php code to kafka.



{ "records": [
  {
    "value": {
      "id": 1,
      "name": "A green door",
      "price": 12.50,
      "tags": ["home", "green"]
    }
  }
]}


Here is the code:

==code start ==
<?php
$data  = '{ "records": [{"value": {"id": 1,"name": "A green door","price": 12.50,"tags": ["home", "green"]}}]}';

print $data;

// set up the curl resource
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://localhost:8082/topics/logstash_logs_test");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
curl_setopt($ch, CURLOPT_HEADER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
    'Content-Type: application/vnd.kafka.v1+json',
    'Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json',

    'Content-Length: ' . strlen($data)
));

$output = curl_exec($ch);

echo($output) . PHP_EOL;

curl_close($ch);

?>


==code end==




Got the following error on console.
$php rest.php

HTTP/1.1 500 Internal Server Error
Date: Tue, 14 Jun 2016 19:19:40 GMT
Content-Type: application/vnd.kafka.v1+json
Content-Length: 52
Server: Jetty(9.2.12.v20150709)

{"error_code":500,"message":"Internal Server Error"}



On kafka-rest console, got this error.





 [2016-06-14 19:11:19,480] ERROR Unhandled exception resulting in internal server error response (io.confluent.rest.exceptions.GenericExceptionMapper:37)
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token
 at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@4aaa92e8; line: 1, column: 16] (through reference chain: io.confluent.kafkarest.entities.TopicProduceRequest["records"]->java.util.ArrayList[0]->io.confluent.kafkarest.entities.BinaryTopicProduceRecord["value"])
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
    at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:835)
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:59)
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:12)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:523)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeWithErrorWrapping(BeanDeserializer.java:461)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:381)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1073)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:295)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:142)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:217)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:25)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:523)
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95)
    at com.fasterxml.jackson.databind.deser.impl.BeanPropertyMap.findDeserializeAndSet(BeanPropertyMap.java:285)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:248)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136)
    at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1410)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:860)
    at com.fasterxml.jackson.jaxrs.base.ProviderBase.readFrom(ProviderBase.java:810)
    at io.confluent.rest.validation.JacksonMessageBodyProvider.readFrom(JacksonMessageBodyProvider.java:65)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.invokeReadFrom(ReaderInterceptorExecutor.java:260)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.aroundReadFrom(ReaderInterceptorExecutor.java:236)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
    at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundReadFrom(MappableExceptionWrapperInterceptor.java:74)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
    at org.glassfish.jersey.message.internal.MessageBodyFactory.readFrom(MessageBodyFactory.java:1085)
    at org.glassfish.jersey.message.internal.InboundMessageContext.readEntity(InboundMessageContext.java:853)
    at org.glassfish.jersey.server.ContainerRequest.readEntity(ContainerRequest.java:270)
    at org.glassfish.jersey.server.internal.inject.EntityParamValueFactoryProvider$EntityValueFactory.provide(EntityParamValueFactoryProvider.java:96)
    at org.glassfish.jersey.server.spi.internal.ParameterValueHelper.getParameterValues(ParameterValueHelper.java:81)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$AbstractMethodParamInvoker.getParamValues(JavaResourceMethodDispatcherProvider.java:127)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
    at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:334)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:808)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
    at org.eclipse.jetty.server.Server.handle(Server.java:499)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
    at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
    at java.lang.Thread.run(Thread.java:745)
[2016-06-14 19:11:19,522] INFO 122.164.118.156 - - [14/Jun/2016:19:11:19 +0000] "POST /topics/logstash_logs_test HTTP/1.1" 500 52  164 (io.confluent.rest-utils.requests:77)



It says as ": Can not deserialize instance of java.lang.String out of START_OBJECT token"


What we have to do on the PHP side to solve this issue?


Thanks.






Ewen Cheslack-Postava

unread,
Jun 14, 2016, 5:23:16 PM6/14/16
to Confluent Platform
I think your content type is wrong. You have:

Content-Type: application/vnd.kafka.v1+json

but I think you want

Content-Type: application/vnd.kafka.json.v1+json

The repetition of json in the Content type is a bit confusing, but this is due to the fact that we need to express both the *embedded format* (first instance, which could also be replaced by "binary" or "avro") as well as the container format (i.e. the entire payload is JSON regardless of whether you embed JSON data, binary, or Avro).


-Ewen


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/10347cd8-760e-43e0-af03-b9aa01b57518%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Shrinivasan T

unread,
Jun 15, 2016, 3:06:40 AM6/15/16
to confluent...@googlegroups.com
Awesome Ewen,

Content-Type: application/vnd.kafka.json.v1+json

Adding this works fine.

If possible, update the docs with the exact content type string.

It will help a lot to all.

In the docs, we can not find such content type string.

Thanks.
Reply all
Reply to author
Forward
0 new messages