KiteSDK: Unexpected record ordering in a dataset

7 views
Skip to first unread message

Qian Xu

unread,
May 5, 2015, 1:47:59 PM5/5/15
to cdk...@cloudera.org
Dear Kite Folks,

I created such an unittest:
1. I create a Hive dataset (e.g. dataset:hive:default/test1) with 2 records
2. I append 2 new records into this dataset.
3. I verify all records. But the actual record order is 3, 4, 1, 2. In other words the newly imported records are read first.

Is this an expected behavior? Or I have done something wrong. 

Corresponding code snippet:
{code}
  // Code to configure a MR job
  public static void configureImportJob(Configuration conf, Schema schema,
      String uri, WriteMode writeMode) throws IOException {
    Dataset dataset;
    if (Datasets.exists(uri)) {
      if (WriteMode.DEFAULT.equals(writeMode)) {
        throw new IOException("Destination exists! " + uri);
      }

      dataset = Datasets.load(uri);
      Schema writtenWith = dataset.getDescriptor().getSchema();
      if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
        throw new IOException(
            String.format("Expected schema: %s%nActual schema: %s",
                writtenWith, schema));
      }
    } else {
      dataset = createDataset(schema, getCompressionType(conf), uri);
    }
    conf.set(CONF_AVRO_SCHEMA, schema.toString());

    DatasetKeyOutputFormat.ConfigBuilder builder =
        DatasetKeyOutputFormat.configure(conf);
    if (WriteMode.OVERWRITE.equals(writeMode)) {
      builder.overwrite(dataset);
    } else if (WriteMode.APPEND.equals(writeMode)) {
      builder.appendTo(dataset);
    } else {
      builder.writeTo(dataset);
    }
  }

  // Code in unittest test to read records
  private void verifyHiveDataset(String tableName, Object[][] valsArray) {
    String datasetUri = String.format("dataset:hive:default/%s",
        tableName.toLowerCase());
    assertTrue(Datasets.exists(datasetUri));
    Dataset dataset = Datasets.load(datasetUri);
    assertFalse(dataset.isEmpty());

    DatasetReader<GenericRecord> reader = dataset.newReader();
    try {
      for (Object[] vals : valsArray) {
        assertTrue("Expect record: " + Arrays.toString(vals), reader.hasNext());
        GenericRecord record = reader.next();
        assertNotNull(record);
        for (int i = 0; i < vals.length; i++) {
          assertEquals(vals[i], record.get(i));
        }
      }
      assertFalse(reader.hasNext());
    } finally {
      reader.close();
    }
  }
{code}

I'm using Kite SDK 1.0.0.

Thanks
Stanley (Xu, Qian)

Joey Echeverria

unread,
May 5, 2015, 6:40:11 PM5/5/15
to Qian Xu, cdk...@cloudera.org
I'd describe it as not-unexpected as the behavior isn't defined.
Looking at the implementation, the files will be iterated over in
alphabetical order. Since files are named with a random UUID, the
order will be random.

Do you have a use case that requires the data to be ordered by insertion time?

-Joey
> --
> You received this message because you are subscribed to the Google Groups
> "CDK Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cdk-dev+u...@cloudera.org.
> For more options, visit https://groups.google.com/a/cloudera.org/d/optout.



--
Joey Echeverria
Senior Infrastructure Engineer

Qian Xu

unread,
May 6, 2015, 1:31:04 AM5/6/15
to cdk...@cloudera.org, sx....@googlemail.com
Hi Joey, thanks for the explanation! 

The use case is about to export records from mysql (or any other relational database) table to Hive. I think I should add an index-able column to manage record ordering. 

Thanks
Stanley (Xu, Qian)
Reply all
Reply to author
Forward
0 new messages