I'm trying to implement a scatter/gather piece of code.
Does anyone see what I've done wrong?
Mike
----- test.java ------
import java.io.*;
import java.net.*;
import java.util.*;
public class test implements Runnable {
private int serverPort = 0;
private ServerSocket ss = null;
private Vector sockets = null;
private boolean run = false;
private holder arr[] = null;
private int idx = 0;
public test(int port) {
serverPort = port;
try {
ss = new ServerSocket(serverPort);
} catch(Exception e) {
e.printStackTrace();
}
sockets = new Vector();
run = true;
new Thread(this).start();
}
public void work(holder _arr[]) {
arr = _arr;
int i, nsockets = sockets.size();
worker workers[] = new worker[nsockets];
// initialize
System.out.println("nsockets = " + nsockets);
idx = 0;
// wake waiting threads
System.out.println("notifying all waiting processes");
try {
notifyAll();
} catch(Exception e) {
// nothing
}
// start the worker threads
System.out.print("activitating workers...");
for(i = 0; i < nsockets; i++) {
System.out.print(" creating worker " + i);
workers[i] = new worker((Socket) sockets.get(i), th
is, sockets);
System.out.print(" starting worker");
workers[i].start();
System.out.print(" finished worker");
}
System.out.println(" done");
// join the worker threads
System.out.print("joining threads ...");
try {
for(i = 0; i < nsockets; i++) {
System.out.print(" " + i);
workers[i].join();
}
} catch(Exception e) {
e.printStackTrace();
}
System.out.println(" done");
// print the results
for(i = 0; i < arr.length; i++) {
System.out.println("arr[i]=" + arr[i]);
}
}
public synchronized holder next() {
holder h = null;
if(arr == null || arr.length == 0) {
try {
wait();
} catch(Exception e) {
// nothing
}
}
while(idx < arr.length) {
if(arr[idx].evaluated == false) {
return arr[idx++];
}
idx++;
}
idx = 0;
while(idx < arr.length) {
if(arr[idx].evaluated == false) {
return arr[idx++];
}
idx++;
}
return null;
}
public void run() {
Socket s = null;
while(run) {
try {
s = ss.accept();
System.out.println("accepted socket " + s);
sockets.add(s);
new worker(s, this, sockets);
} catch(Exception e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) {
Random r = new Random();
holder a[] = new holder[10];
for(int i = 0; i < a.length; i++) {
a[i] = new holder(r.nextInt(200));
System.out.println("h.val=" + a[i].val);
}
test t = new test(Integer.parseInt(args[0]));
System.out.print("sleeping for five seconds...");
try { Thread.sleep(5000); } catch(Exception e) { e.printSta
ckTrace(); }
System.out.println("starting");
t.work(a);
}
}
class worker extends Thread {
Vector sockets = null;
boolean run = false;
Socket s = null;
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
test test = null;
public worker(Socket _s, test _test, Vector _sockets) {
s = _s;
test = _test;
sockets = _sockets;
System.out.print("creating object streams... ");
try {
oos = new ObjectOutputStream(s.getOutputStream());
ois = new ObjectInputStream(s.getInputStream());
} catch(Exception e) {
e.printStackTrace();
}
System.out.println("done");
run = true;
}
public void run() {
holder h = null, in = null;
System.out.println("worker starting");
while((h = test.next()) != null) {
System.out.println("sending h.val=" + h.val);
try {
System.out.println("sending... ");
oos.writeObject(h);
System.out.println("reading... ");
in = (holder) ois.readObject();
h.val = in.val;
h.evaluated = true;
} catch(Exception e) {
e.printStackTrace();
break;
}
}
try {
s.close();
} catch(Exception e) {
e.printStackTrace();
}
sockets.remove(s);
}
}
class holder implements Serializable {
public boolean evaluated = false;
public int val = 0;
public holder(int _val) {
evaluated = false;
val = _val;
}
public String toString() {
return "evaluated=" + evaluated + " value=" + val;
}
}
----- test.java ------
---- test output -----
[user@x z]$ java test 9999
h.val=2
h.val=141
h.val=97
h.val=3
h.val=65
h.val=175
h.val=33
h.val=113
h.val=191
h.val=150
sleeping for five seconds...accepted socket Socket[addr=/127.0.0.1,port=351
92,localport=9999]
creating object streams... done
starting
nsockets = 1
notifying all waiting processes
activitating workers... creating worker 0creating object streams... java.ne
t.SocketException: Connection reset by peer: Connection reset by peer
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:116)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream
.java:2135)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputS
tream.java:2148)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectI
nputStream.java:2619)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.jav
a:726)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:251)
at worker.<init>(test.java:140)
at test.work(test.java:46)
at test.main(test.java:121)
done
starting worker finished worker done
joining threads ... 0worker starting
sending h.val=158
sending...
java.net.SocketException: Socket closed
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:
92)
at java.net.SocketOutputStream.write(SocketOutputStream.java:126)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOut
putStream.java:1637)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMod
e(ObjectOutputStream.java:1546)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.
java:1146)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.jav
a:1100)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStrea
m.java:1239)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1052)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStrea
m.java:1353)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:2
81)
at worker.run(test.java:155)
done
arr[i]=evaluated=false value=158
arr[i]=evaluated=false value=100
arr[i]=evaluated=false value=43
arr[i]=evaluated=false value=6
arr[i]=evaluated=false value=36
arr[i]=evaluated=false value=15
arr[i]=evaluated=false value=140
arr[i]=evaluated=false value=31
arr[i]=evaluated=false value=123
arr[i]=evaluated=false value=11
<I break the application here>
---- test output -----
---- client.java -----
import java.io.*;
import java.net.*;
import java.util.*;
public class client {
int port = 0;
public client(String host, int port) {
Socket s = null;
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
s = new Socket(host, port);
System.out.print("creating object streams... ");
ois = new ObjectInputStream(s.getInputStream());
oos = new ObjectOutputStream(s.getOutputStream());
System.out.println("done");
System.out.println("connected to port " + s);
while(true) {
System.out.print("reading... ");
holder h = (holder) ois.readObject();
System.out.println("received");
h.val += 1000;
try { Thread.sleep(2000); } catch(Exception
e) {}
System.out.print("sending... ");
oos.writeObject(h);
System.out.println("sent");
}
} catch(Exception e) {
e.printStackTrace();
}
try {
oos.close();
ois.close();
s.close();
} catch(Exception e) {
// nothing
}
}
public static void main(String args[]) {
new client(args[0], Integer.parseInt(args[1]));
}
}
---- client.java -----
---- client output ---
[user@x z]$ java client localhost 9999
creating object streams... done
connected to port Socket[addr=localhost/127.0.0.1,port=9999,localport=35185
]
reading... java.io.StreamCorruptedException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:129
1)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:322)
at client.<init>(client.java:21)
at client.main(client.java:42)
[user@x z]$ java client localhost 9999
---- client output ---