Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

problem with multi-threaded server and serialization?

4 views
Skip to first unread message

Mike

unread,
Jan 24, 2003, 3:56:22 PM1/24/03
to
I'm not sure where to go with this code. My goal is to take this code
and drop it into another application to play with clusters. This code
is to create an instance of test passing in an array of objects for
manipulation. The class test creates a server socket and a thread
to listen to the server socket. When a connection happens the thread
puts the new client socket into a synchronized vector that the
main thread will look at when the parent calls test.work(). The work()
method creates a worker (that has its own thread) for each socket
that exists in the vector. When I start this it seems to work and
then I get socket errors.

I'm trying to implement a scatter/gather piece of code.

Does anyone see what I've done wrong?

Mike

----- test.java ------
import java.io.*;
import java.net.*;
import java.util.*;

public class test implements Runnable {
private int serverPort = 0;
private ServerSocket ss = null;
private Vector sockets = null;
private boolean run = false;
private holder arr[] = null;
private int idx = 0;

public test(int port) {
serverPort = port;
try {
ss = new ServerSocket(serverPort);
} catch(Exception e) {
e.printStackTrace();
}
sockets = new Vector();
run = true;
new Thread(this).start();
}

public void work(holder _arr[]) {
arr = _arr;
int i, nsockets = sockets.size();
worker workers[] = new worker[nsockets];

// initialize
System.out.println("nsockets = " + nsockets);
idx = 0;

// wake waiting threads
System.out.println("notifying all waiting processes");
try {
notifyAll();
} catch(Exception e) {
// nothing
}

// start the worker threads
System.out.print("activitating workers...");
for(i = 0; i < nsockets; i++) {
System.out.print(" creating worker " + i);
workers[i] = new worker((Socket) sockets.get(i), th
is, sockets);
System.out.print(" starting worker");
workers[i].start();
System.out.print(" finished worker");
}
System.out.println(" done");

// join the worker threads
System.out.print("joining threads ...");
try {
for(i = 0; i < nsockets; i++) {
System.out.print(" " + i);
workers[i].join();
}
} catch(Exception e) {
e.printStackTrace();
}
System.out.println(" done");

// print the results
for(i = 0; i < arr.length; i++) {
System.out.println("arr[i]=" + arr[i]);
}
}

public synchronized holder next() {
holder h = null;
if(arr == null || arr.length == 0) {
try {
wait();
} catch(Exception e) {
// nothing
}
}
while(idx < arr.length) {
if(arr[idx].evaluated == false) {
return arr[idx++];
}
idx++;
}
idx = 0;
while(idx < arr.length) {
if(arr[idx].evaluated == false) {
return arr[idx++];
}
idx++;
}
return null;
}

public void run() {
Socket s = null;
while(run) {
try {
s = ss.accept();
System.out.println("accepted socket " + s);
sockets.add(s);
new worker(s, this, sockets);
} catch(Exception e) {
e.printStackTrace();
}
}
}

public static void main(String args[]) {
Random r = new Random();
holder a[] = new holder[10];
for(int i = 0; i < a.length; i++) {
a[i] = new holder(r.nextInt(200));
System.out.println("h.val=" + a[i].val);
}
test t = new test(Integer.parseInt(args[0]));
System.out.print("sleeping for five seconds...");
try { Thread.sleep(5000); } catch(Exception e) { e.printSta
ckTrace(); }
System.out.println("starting");
t.work(a);
}
}

class worker extends Thread {
Vector sockets = null;
boolean run = false;
Socket s = null;
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
test test = null;

public worker(Socket _s, test _test, Vector _sockets) {
s = _s;
test = _test;
sockets = _sockets;
System.out.print("creating object streams... ");
try {
oos = new ObjectOutputStream(s.getOutputStream());
ois = new ObjectInputStream(s.getInputStream());
} catch(Exception e) {
e.printStackTrace();
}
System.out.println("done");
run = true;
}

public void run() {
holder h = null, in = null;
System.out.println("worker starting");
while((h = test.next()) != null) {
System.out.println("sending h.val=" + h.val);
try {
System.out.println("sending... ");
oos.writeObject(h);
System.out.println("reading... ");
in = (holder) ois.readObject();
h.val = in.val;
h.evaluated = true;
} catch(Exception e) {
e.printStackTrace();
break;
}
}
try {
s.close();
} catch(Exception e) {
e.printStackTrace();
}
sockets.remove(s);
}
}

class holder implements Serializable {
public boolean evaluated = false;
public int val = 0;
public holder(int _val) {
evaluated = false;
val = _val;
}
public String toString() {
return "evaluated=" + evaluated + " value=" + val;
}
}
----- test.java ------

---- test output -----
[user@x z]$ java test 9999
h.val=2
h.val=141
h.val=97
h.val=3
h.val=65
h.val=175
h.val=33
h.val=113
h.val=191
h.val=150
sleeping for five seconds...accepted socket Socket[addr=/127.0.0.1,port=351
92,localport=9999]
creating object streams... done
starting
nsockets = 1
notifying all waiting processes
activitating workers... creating worker 0creating object streams... java.ne
t.SocketException: Connection reset by peer: Connection reset by peer
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:116)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream
.java:2135)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputS
tream.java:2148)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectI
nputStream.java:2619)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.jav
a:726)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:251)
at worker.<init>(test.java:140)
at test.work(test.java:46)
at test.main(test.java:121)
done
starting worker finished worker done
joining threads ... 0worker starting
sending h.val=158
sending...
java.net.SocketException: Socket closed
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:
92)
at java.net.SocketOutputStream.write(SocketOutputStream.java:126)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOut
putStream.java:1637)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMod
e(ObjectOutputStream.java:1546)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.
java:1146)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.jav
a:1100)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStrea
m.java:1239)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1052)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStrea
m.java:1353)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:2
81)
at worker.run(test.java:155)
done
arr[i]=evaluated=false value=158
arr[i]=evaluated=false value=100
arr[i]=evaluated=false value=43
arr[i]=evaluated=false value=6
arr[i]=evaluated=false value=36
arr[i]=evaluated=false value=15
arr[i]=evaluated=false value=140
arr[i]=evaluated=false value=31
arr[i]=evaluated=false value=123
arr[i]=evaluated=false value=11
<I break the application here>
---- test output -----

---- client.java -----
import java.io.*;
import java.net.*;
import java.util.*;

public class client {
int port = 0;

public client(String host, int port) {
Socket s = null;
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
s = new Socket(host, port);
System.out.print("creating object streams... ");
ois = new ObjectInputStream(s.getInputStream());
oos = new ObjectOutputStream(s.getOutputStream());
System.out.println("done");
System.out.println("connected to port " + s);
while(true) {
System.out.print("reading... ");
holder h = (holder) ois.readObject();
System.out.println("received");
h.val += 1000;
try { Thread.sleep(2000); } catch(Exception
e) {}
System.out.print("sending... ");
oos.writeObject(h);
System.out.println("sent");
}
} catch(Exception e) {
e.printStackTrace();
}
try {
oos.close();
ois.close();
s.close();
} catch(Exception e) {
// nothing
}
}

public static void main(String args[]) {
new client(args[0], Integer.parseInt(args[1]));
}
}
---- client.java -----

---- client output ---
[user@x z]$ java client localhost 9999
creating object streams... done
connected to port Socket[addr=localhost/127.0.0.1,port=9999,localport=35185
]
reading... java.io.StreamCorruptedException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:129
1)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:322)
at client.<init>(client.java:21)
at client.main(client.java:42)
[user@x z]$ java client localhost 9999
---- client output ---

0 new messages