AvroTypeException: Expected record-start. Got VALUE_STRING

4,622 views
Skip to first unread message

Ambarish Hazarnis

unread,
Apr 2, 2014, 4:56:45 PM4/2/14
to camu...@googlegroups.com, yya...@paypal.com, ahaz...@ebay.com
Hello folks, 
Our team is starting out with Camus. I have created a Kafka Producer which publishes messages build according to the DummyLog schema.

I can see the message from consumer console. However, when I run CamusJob.java I am getting the below error-

Error:
topic=DUMMY_LOGA partition=0leaderId=1 server= service= beginOffset=5 offset=6 server= checksum=75240036 time=1396471057691
java.io.IOException: java.lang.RuntimeException: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:128)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:254)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:532)
at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
Caused by: java.lang.RuntimeException: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:41)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:12)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:125)
... 7 more
Caused by: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:697)
at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:490)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
at org.apache.avro.io.JsonDecoder.readLong(JsonDecoder.java:178)
at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:159)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:135)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:26)

I am using the following command to run CamusJob:
 java -cp /home/ahazarnis/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:target/camus-etl-kafka-0.1.0-SNAPSHOT-jar-with-dependencies.jar com.linkedin.camus.etl.kafka.CamusJob -P my.properties


Code for Kafka Producer:

  Map<CharSequence, CharSequence> m = new HashMap<CharSequence, CharSequence>();
  m.put("a", "c");
  DummyLog dummy = DummyLog.newBuilder().setId(Long.valueOf(1)).setLogTime(Long.valueOf(1124)).setMuchoStuff(m).build();
......
...... 

  ByteArrayOutputStream out = new ByteArrayOutputStream();
  GenericDatumWriter<DummyLog> writer = new GenericDatumWriter<DummyLog>(dummy.getSchema());
  JsonEncoder encoder = EncoderFactory.get().jsonEncoder(dummy.getSchema(), out);
       
        writer.write(dummy, encoder);
        encoder.flush();
        out.close();
   String msg = out.toString();
   KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic,msg);
.....
.....




I have set the following properties:
#Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder

# Used by avro-based Decoders to use as their Schema Registry
kafka.message.coder.schema.registry.class=com.linkedin.camus.example.schemaregistry.DummySchemaRegistry

DummySchemaRegistry.java-

public class DummySchemaRegistry extends MemorySchemaRegistry<Schema> {
public DummySchemaRegistry() {
super();
super.register("DUMMY_LOGA", DummyLog.SCHEMA$ );
  }
}



Let me know if you need any additional information. Awaiting your response.





Thank you,
Ambarish
eBay Inc.












Gaurav Gupta

unread,
Apr 2, 2014, 5:47:49 PM4/2/14
to Ambarish Hazarnis, camu...@googlegroups.com, yya...@paypal.com, ahaz...@ebay.com
Hey Ambarish,

Not sure whats going wrong here, but seems to be a JSON decoding issue. Probably a schema mismatch.
Caused by: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:697)
at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:490)

Can you add some more logging to the LatestSchemaKafkaAvroMessageDecoder.java and check what schema is getting used?

Thanks,
Gaurav

--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Yang, Yang Yang

unread,
Apr 2, 2014, 5:56:56 PM4/2/14
to Gaurav Gupta, Ambarish Hazarnis, camu...@googlegroups.com, Hazarnis, Ambarish
btw, we hooked up a console consumer, and it does show correctly the json objects sent out by our producer
________________________________________
From: Gaurav Gupta [ggu...@linkedin.com]
Sent: Wednesday, April 02, 2014 2:47 PM
To: Ambarish Hazarnis; camu...@googlegroups.com
Cc: Yang, Yang Yang; Hazarnis, Ambarish
Subject: Re: AvroTypeException: Expected record-start. Got VALUE_STRING

Hey Ambarish,

Not sure whats going wrong here, but seems to be a JSON decoding issue. Probably a schema mismatch.
Caused by: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:697)
at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:490)

Can you add some more logging to the LatestSchemaKafkaAvroMessageDecoder.java and check what schema is getting used?

Thanks,
Gaurav

To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com<mailto:camus_etl+...@googlegroups.com>.

Tracy Zhang

unread,
Jun 17, 2014, 8:50:21 PM6/17/14
to camu...@googlegroups.com, yya...@paypal.com, ahaz...@ebay.com
Did you solved this problem? I am facing the same problem now.

alvi...@gmail.com

unread,
Oct 9, 2014, 4:15:43 PM10/9/14
to camu...@googlegroups.com, yya...@paypal.com, ahaz...@ebay.com
Hi Guys,

I have the same issue here.
Anyone has solutions for it.

Thanks.
[CamusJob] - java.io.IOException: java.lang.RuntimeException: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:135)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:261)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:483)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.lang.RuntimeException: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:41)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:12)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:132)
... 12 more
Caused by: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:697)
at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:490)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
at org.apache.avro.io.JsonDecoder.readLong(JsonDecoder.java:178)
at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:159)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:26)
... 14 more

Error from file [hdfs://localhost:8020/user/cloudera/camus_kafka_etl/base/2014-10-09-20-05-05/errors-m-00000]

Yang Yang

unread,
Feb 5, 2015, 6:34:56 PM2/5/15
to camu...@googlegroups.com, yya...@paypal.com, ahaz...@ebay.com
we went back to an older version of kafka and camus and it worked. kafka was like 0.7.0 but sorry I don't remember the camus version

Ambarish Hazarnis

unread,
Feb 9, 2015, 4:13:09 PM2/9/15
to camu...@googlegroups.com, yya...@paypal.com, ahaz...@ebay.com
Hi,

Basically, you have to encode your data before sending it over kafka ....
Reply all
Reply to author
Forward
0 new messages