Thread safety of Vertx event loops whose handlers modify shared resource

365 views
Skip to first unread message

Rok HIk

unread,
Nov 16, 2016, 1:34:34 AM11/16/16
to vert.x

The following code emulates concurrent modification of shared resource: file test.txt by handlers of vertx event loops On that link there's a note:

Even though a Vertx instance maintains multiple event loops, any particular handler will never be executed concurrently, and in most cases (with the exception of worker verticles) will always be called using the exact same event loop.

So i expected the file to include sequential prints by handlers of each event loop. Actually, there were scrambled lines, so this note doesn't imply thread safety. What does it imply? And how to achieve thread safety in this or equivalent case without synchronizing access to shared resource?


package org.openlso.rabbitmq;

import java.io.IOException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.openlso.rabbitmq.configuration.RabbitMQBrokerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rabbitmq.RabbitMQClient;

@RunWith(VertxUnitRunner.class)
public class RabbitmqvertxApplicationTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqvertxApplicationTests.class);
    private Vertx vertx;

    @Before
    public void setUp(TestContext context) {
        vertx = Vertx.vertx();
    }     
    @After
    public void tearDown(TestContext context) {
        vertx.close(context.asyncAssertSuccess());
    }

    @Test
    public void getDo(TestContext context) {


        final Async async = context.async();

        for (int i = 0; i < 100; i++) {

            vertx.createHttpClient().getNow(8090, "127.0.0.1", "/someapi?Message=" + "a", response -> {
                try {
                    Utils.dosmth("REST");
                } catch (IOException e) {
                    e.printStackTrace();
                }
                System.out.println("got response " + response.toString());
            });
        }

        JsonObject config = new JsonObject();
        config.put("user", "admin");
        config.put("password", "admin");
        config.put("host", "localhost");
        config.put("port", 5672);
        config.put("connectionTimeout", 60);
        RabbitMQClient client = RabbitMQClient.create(vertx, config);
        client.start(res -> {
            if (res.succeeded()) {
                System.out.println("connection to rabbit succeeded");
                for (int i = 0; i < 100; i++) {

                    JsonObject json = new JsonObject("{\"body\":\"PRODUCER\"}");
                    client.basicPublish("vertxPaymentsExchnage", "vertxPayment.direct", json, pubResult -> {
                        if (pubResult.succeeded()) {
                            System.out.println("Message published !");
                            try {
                                Utils.dosmth("PRODUCER");
                            } catch (IOException e) {
                                e.printStackTrace();
                            }

                        } else {
                            System.out.println("Message not published !");
                            pubResult.cause().printStackTrace();
                        }
                    });

                }
                client.stop(context.asyncAssertSuccess());
                async.complete();
            }
            else {
                System.out.println("connection to rabbit failed");
                res.cause().printStackTrace();
            }
        });

    }

}

package org.openlso.rabbitmq;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.StandardOpenOption.*;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


    public class Utils {


        private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);

        public static String dosmth(String msg) throws IOException {
            LOGGER.info("inside dosmth " + msg);
            String[] strArr = new String[10000];
            strArr[0] = "************************************************************************************\n\n";
            for (int i = 1; i < strArr.length; i++) {
                strArr[i] = msg + " " + Thread.currentThread().getName() + " " + i;     }
            Files.write(Paths.get("test.txt"), Arrays.asList(strArr), UTF_8, APPEND,  CREATE);
            return "hi";                
        }

    }

Thomas SEGISMONT

unread,
Nov 16, 2016, 4:03:47 AM11/16/16
to ve...@googlegroups.com
Hi,

So the code you're executing is not even organized 

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/221b8acb-e266-4c21-b123-35c703c272b2%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Thomas SEGISMONT

unread,
Nov 16, 2016, 4:08:52 AM11/16/16
to ve...@googlegroups.com
Sorry, wrong key combo sent the email... I'm bad at typing :)


So the code you're executing is not even organized as verticles. In this case, no guarantee whatsoever about the event loop handling the event (in your case, the http client response).
Besides, you are calling blocking code (Files#write in method dosmth) from a Vert.x event loop and this is not ok (never block the event loop).

If you want to write to the same file, I suggest you create an AsyncFile with Vert.x FileSystem and append data with it.
And organize this logic as a verticle.



Reply all
Reply to author
Forward
0 new messages