PB MUX reconnect

74 views
Skip to first unread message

yosri chaat

unread,
May 16, 2013, 9:08:15 AM5/16/13
to jpos-...@googlegroups.com
Hi,

I m using ISOMux with a variant channel as a client that just extends BaseChannel,

After disconnection, I m not getting the MUX or the channel reconnected?

Any way to mke it reconnect automaticcaly?

NB  : still using 1.6 release

Thanks

Yosri

chhil

unread,
May 16, 2013, 9:29:44 AM5/16/13
to jpos-...@googlegroups.com
Dont have much detail about your code, maybe check if the thread that runs the mux needs to be restarted.

Ideally start looking at using QMux instead of ISOMux.

-chhil

--
--
jPOS is licensed under AGPL - free for community usage for your open-source project. Licenses are also available for commercial usage.
Please support jPOS, contact: sa...@jpos.org
 
You received this message because you are subscribed to the "jPOS Users" group.
Please see http://jpos.org/wiki/JPOS_Mailing_List_Readme_first
To post to this group, send email to jpos-...@googlegroups.com
To unsubscribe, send email to jpos-users+...@googlegroups.com
For more options, visit this group at http://groups.google.com/group/jpos-users
 
---
You received this message because you are subscribed to the Google Groups "jPOS Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jpos-users+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

yosri chaat

unread,
May 16, 2013, 10:01:28 AM5/16/13
to jpos-...@googlegroups.com
Thanks Chhil for the quick response,
 
My code looks like :
 
***************
    externalChannel = new MyChannel(Ip,
                                     port,
                                     new MyPackager());
    ISOMUX mux = new ISOMUX(externalChannel);
    MyProcessor myProcessor = new MyProcessor ();
    mux.setISORequestListener(myProcessor );
    Thread tr2 = new Thread(mux);
    tr2.start();
******************
where MyChannel class looks like  :
 
******************
public class MyChannel
    extends BaseChannel {
...}
******************

******************
and ISOMux class looks like  :
 
******************
/**
 * Should run in it's own thread. Starts another Receiver thread
 *
 * @author <a href="mailto:a...@cs.com.uy">Alejandro P. Revilla</a>
 * @version $Revision: 1.52 $ $Date: 2004/07/22 15:33:21 $
 * @see ISORequest
 * @see ISOChannel
 * @see ISOException
 * @see ISORequestListener
 */
public class ISOMUX
    implements Runnable, ISOSource, LogSource, MUX,
    ReConfigurable, Loggeable, ISOMUXMBean {
  private ISOChannel channel;
  private Thread rx = null, tx = null;
  private Vector txQueue;
  private Hashtable rxQueue;
  private int traceNumberField = 11;
  private volatile boolean terminate = false;
  private String name;
  private ISOMUX muxInstance;
  private boolean doConnect;
  protected Logger logger = null;
  protected String realm = null;
  public static final int CONNECT = 0;
  public static final int TX = 1;
  public static final int RX = 2;
  public static final int TX_EXPIRED = 3;
  public static final int RX_EXPIRED = 4;
  public static final int TX_PENDING = 5;
  public static final int RX_PENDING = 6;
  public static final int RX_UNKNOWN = 7;
  public static final int RX_FORWARDED = 8;
  public static final int SIZEOF_CNT = 9;
  private int[] cnt;
  private ISORequestListener requestListener;
  /**
   * @param c a connected or unconnected ISOChannel
   */
  public ISOMUX(ISOChannel c) {
    super();
    initMUX(c);
  }
  /**
   * @param c a connected or unconnected ISOChannel
   * @param logger a logger
   * @param realm  logger's realm
   */
  public ISOMUX(ISOChannel c, Logger logger, String realm) {
    super();
    setLogger(logger, realm);
    initMUX(c);
  }
  public void setConfiguration(Configuration cfg) {
    setTraceNumberField(cfg.getInt("tracenofield"));
  }
  private void initMUX(ISOChannel c) {
    doConnect = true;
    channel = c;
    rx = null;
    txQueue = new Vector();
    rxQueue = new Hashtable();
    cnt = new int[SIZEOF_CNT];
    requestListener = null;
    rx = new Thread(new Receiver(this));
    name = "";
    muxInstance = this;
  }
  /**
   * allow changes to default value 11 (used in ANSI X9.2 messages)
   * @param traceNumberField new traceNumberField
   */
  public void setTraceNumberField(int traceNumberField) {
    if (traceNumberField > 0) {
      this.traceNumberField = traceNumberField;
    }
  }
  /**
   * @return the underlying ISOChannel
   */
  public ISOChannel getISOChannel() {
    return channel;
  }
  /**
   * set an ISORequestListener for unmatched messages
   * @param rl a request listener object
   * @see ISORequestListener
   */
  public void setISORequestListener(ISORequestListener rl) {
    requestListener = rl;
  }
  /**
   * remove possible ISORequestListener
   * @see ISORequestListener
   */
  public void removeISORequestListener() {
    requestListener = null;
  }
  /**
   * construct key to match request with responses
   * @param   m   request/response
   * @return      key (default terminal(41) + tracenumber(11))
   */
  protected String getKey(ISOMsg m) throws ISOException {
    return (m.hasField(41) ? ISOUtil.zeropad( (String) m.getValue(41), 16) : "")
        + (m.hasField(traceNumberField) ?
           ISOUtil.zeropad( (String) m.getValue(traceNumberField), 6) :
           Long.toString(System.currentTimeMillis()));
  }
  /**
   * get rid of expired requests
   */
  private void purgeRxQueue() {
    Enumeration e = rxQueue.keys();
    while (e.hasMoreElements()) {
      Object key = e.nextElement();
      ISORequest r = (ISORequest) rxQueue.get(key);
      if (r != null && r.isExpired()) {
        rxQueue.remove(key);
        cnt[RX_EXPIRED]++;
      }
    }
  }
  /**
   * show Counters
   * @param p - where to print
   */
  public void showCounters(PrintStream p) {
    int[] c = getCounters();
    p.println("           Connections: " + c[CONNECT]);
    p.println("           TX messages: " + c[TX]);
    p.println("            TX expired: " + c[TX_EXPIRED]);
    p.println("            TX pending: " + c[TX_PENDING]);
    p.println("           RX messages: " + c[RX]);
    p.println("            RX expired: " + c[RX_EXPIRED]);
    p.println("            RX pending: " + c[RX_PENDING]);
    p.println("          RX unmatched: " + c[RX_UNKNOWN]);
    p.println("          RX forwarded: " + c[RX_FORWARDED]);
  }
  /**
   * get the counters in order to pretty print them
   * or for stats purposes
   */
  public int[] getCounters() {
    cnt[TX_PENDING] = txQueue.size();
    cnt[RX_PENDING] = rxQueue.size();
    return cnt;
  }
  public void resetCounters() {
    cnt = new int[SIZEOF_CNT];
  }
  /**
   * @return number of re-connections on the underlying channel
   */
  public int getConnectionCount() {
    return cnt[CONNECT];
  }
  /**
   * @return number of transmitted messages
   */
  public int getTransmitCount() {
    return cnt[TX];
  }
  /**
   * @return number of expired messages
   */
  public int getExpiredCount() {
    return cnt[TX_EXPIRED];
  }
  /**
   * @return number of messages waiting to be transmited
   */
  public int getTransmitPendingCount() {
    return txQueue.size();
  }
  /**
   * @return number of received messages
   */
  public int getReceiveCount() {
    return cnt[RX];
  }
  /**
   * @return number of unanswered messages
   */
  public int getReceiveExpiredCount() {
    return cnt[RX_EXPIRED];
  }
  /**
   * @return number of messages waiting for response
   */
  public int getReceivePendingCount() {
    return rxQueue.size();
  }
  /**
   * @return number of unknown messages received
   */
  public int getUnknownCount() {
    return cnt[RX_UNKNOWN];
  }
  /**
   * @return number of forwarded messages received
   */
  public int getForwardedCount() {
    return cnt[RX_FORWARDED];
  }
  private class Receiver
      implements Runnable, LogSource {
    Runnable parent;
    protected Receiver(Runnable p) {
      parent = p;
    }
    public void run() {
      int i = 0;
      while (!terminate || !rxQueue.isEmpty() || !txQueue.isEmpty()) {
        if (i++ % 250 == 1) {
          Logger.log(new LogEvent(this, "mux", parent));
        }
        if (channel.isConnected()) {
          try {
            ISOMsg d = channel.receive();
            cnt[RX]++;
            String k = getKey(d);
            ISORequest r = (ISORequest) rxQueue.get(k);
            boolean forward = true;
            if (r != null) {
              rxQueue.remove(k);
              synchronized (r) {
                if (r.isExpired()) {
                  if ( (++cnt[RX_EXPIRED]) % 10 == 0) {
                    purgeRxQueue();
                  }
                }
                else {
                  r.setResponse(d);
                  forward = false;
                }
              }
            }
            if (forward) {
              if (requestListener != null) {
                requestListener.process(muxInstance, d);
                cnt[RX_FORWARDED]++;
              }
              else {
                cnt[RX_UNKNOWN]++;
              }
            }
          }
          catch (Throwable e) {
            if (!terminate) {
              channel.setUsable(false);
              if (! (e instanceof EOFException)) {
                Logger.log(new LogEvent(this, "muxreceiver", e));
              }
              synchronized (parent) {
                parent.notify();
              }
            }
          }
        }
        else {
          try {
            synchronized (rx) {
              rx.wait();
            }
          }
          catch (InterruptedException e) {
            Logger.log(new LogEvent(this, "muxreceiver", e));
          }
        }
      }
      Logger.log(new LogEvent(this, "muxreceiver", "terminate"));
    }
    public void setLogger(Logger logger, String realm) {}
    public String getRealm() {
      return realm;
    }
    public Logger getLogger() {
      return logger;
    }
  }
  private void doTransmit() throws ISOException, IOException {
    while (txQueue.size() > 0) {
      Object o = txQueue.firstElement();
      ISOMsg m = null;
      if (o instanceof ISORequest) {
        ISORequest r = (ISORequest) o;
        if (r.isExpired()) {
          cnt[TX_EXPIRED]++;
        }
        else {
          m = r.getRequest();
          rxQueue.put(getKey(m), r);
          r.setTransmitted();
          synchronized (rx) {
            rx.notify(); // required by ChannelPool
          }
        }
      }
      else if (o instanceof ISOMsg) {
        m = (ISOMsg) o;
      }
      if (m != null) {
        try {
          channel.send(m);
          cnt[TX]++;
        }
        catch (ISOException e) {
          Logger.log(new LogEvent(this, "error", e));
        }
      }
      txQueue.removeElement(o);
      txQueue.trimToSize();
    }
  }
  public void run() {
    tx = Thread.currentThread();
    int rxPriority = rx.getPriority(); // Bug#995787
    if (rxPriority < Thread.MAX_PRIORITY) {
      // OS/400 V4R4 JVM
      rx.setPriority(rxPriority + 1); // Thread problem
      // (Vincent...@amo.com)
    }
    rx.start();
    boolean firstTime = true;
    while (!terminate || !txQueue.isEmpty()) {
      try {
        if (channel.isConnected()) {
          doTransmit();
        }
        else if (doConnect) {
          if (firstTime) {
            firstTime = !firstTime;
            channel.connect();
          }
          else {
            Thread.sleep(5000);
            channel.reconnect();
          }
          cnt[CONNECT]++;
          synchronized (rx) {
            rx.notify();
          }
        }
        else {
          // nothing to do ...
          try {
            Thread.sleep(5000);
          }
          catch (InterruptedException ex) {}
        }
        synchronized (this) {
          if (!terminate &&
              channel.isConnected() &&
              txQueue.size() == 0) {
            this.wait();
          }
        }
      }
      catch (ConnectException e) {
        if (channel instanceof ClientChannel) {
          ClientChannel cc = (ClientChannel) channel;
          Logger.log(new LogEvent(this, "connection-refused",
                                  cc.getHost() + ":" + cc.getPort())
              );
        }
        try {
          Thread.sleep(1000);
        }
        catch (InterruptedException ex) {}
      }
      catch (Exception e) {
        Logger.log(new LogEvent(this, "mux", e));
        try {
          Thread.sleep(1000);
        }
        catch (InterruptedException ex) {}
      }
    }
    // Wait for the receive queue to empty out before shutting down
    while (!rxQueue.isEmpty()) {
      try {
        Thread.sleep(5000); // Wait for the receive queue to clear.
        purgeRxQueue(); // get rid of expired stuff
      }
      catch (InterruptedException e) {
        break;
      }
    }
    // By closing the channel, we force the receive thread to terminate
    try {
      channel.disconnect();
    }
    catch (IOException e) {}
    synchronized (rx) {
      rx.notify();
    }
    try {
      rx.join();
    }
    catch (InterruptedException e) {}
    Logger.log(new LogEvent(this, "mux", "terminate"));
  }
  /**
   * queue an ISORequest
   */
  synchronized public void queue(ISORequest r) {
    txQueue.addElement(r);
    this.notify();
  }
  /**
   * send a message over channel, usually a
   * response from an ISORequestListener
   */
  synchronized public void send(ISOMsg m) {
    txQueue.addElement(m);
    this.notify();
  }
  private void terminate(boolean hard) {
    LogEvent evt = new LogEvent(this, "mux",
                                "<terminate type=\"" + (hard ? "hard" : "soft") + "\"/>");
    evt.addMessage(this);
    Logger.log(evt);
    terminate = true;
    synchronized (this) {
      if (hard) {
        txQueue.removeAllElements();
        rxQueue.clear();
      }
      this.notify();
    }
  }
  /**
   * terminate MUX
   * @param wait Time to wait before forcing shutdown
   */
  public void terminate(int wait) {
    terminate(false);
    try {
      tx.join(wait);
      if (tx.isAlive()) {
        terminate(true);
        tx.join();
      }
    }
    catch (InterruptedException e) {}
  }
  /**
   * terminate MUX (soft terminate, wait forever if necessary)
   */
  public void terminate() {
    terminate(0);
  }
  public boolean isConnected() {
    return channel.isConnected();
  }
  public void setLogger(Logger logger, String realm) {
    this.logger = logger;
    this.realm = realm;
  }
  public String getRealm() {
    return realm;
  }
  public Logger getLogger() {
    return logger;
  }
  public boolean isTerminating() {
    return terminate;
  }
  /**
   * associates this ISOMUX with a name using NameRegistrar
   * @param name name to register
   * @see NameRegistrar
   */
  public void setName(String name) {
    this.name = name;
    NameRegistrar.register("mux." + name, this);
  }
  /**
   * @return ISOMUX instance with given name.
   * @throws NameRegistrar.NotFoundException;
   * @see NameRegistrar
   */
  public static ISOMUX getMUX(String name) throws NameRegistrar.NotFoundException {
    return (ISOMUX) NameRegistrar.get("mux." + name);
  }
  /**
   * @return this ISOMUX's name ("" if no name was set)
   */
  public String getName() {
    return this.name;
  }
  /**
   * ISOMUXs usually calls connect() on the underlying ISOChannel<br>
   * You can prevent this behaveour by passing a false value.
   * @param connect false to prevent connection (default true)
   */
  public void setConnect(boolean connect) {
    this.doConnect = connect;
    if (!connect && isConnected()) {
      channel.setUsable(false);
      try {
        channel.disconnect();
      }
      catch (IOException e) {
        Logger.log(new LogEvent(this, "set-connect", e));
      }
      synchronized (this) {
        this.notify();
      }
    }
  }
  /**
   * @return connect flag value
   */
  public boolean getConnect() {
    return doConnect;
  }
  public void dump(PrintStream p, String indent) {
    p.println(indent + "<mux-stats connected=\"" +
              channel.isConnected() + "\">");
    showCounters(p);
    p.println(indent + "</mux-stats>");
  }
  public ISOMsg request(ISOMsg m, long timeout) throws ISOException {
    ISORequest req = new ISORequest(m);
    queue(req);
    return req.getResponse( (int) timeout);
  }
}

2013/5/16 chhil <chi...@gmail.com>

Alejandro Revilla

unread,
May 16, 2013, 11:46:03 AM5/16/13
to jpos-...@googlegroups.com
Why don't you use 1.9 series, Q2 and QMUX? ISOMUX is very old.

--
@apr



--

yosri chaat

unread,
May 17, 2013, 4:28:20 AM5/17/13
to jpos-...@googlegroups.com
Hi Alejandro,
 
Migration from old way to Q2 will be just hard for some existing applications  

2013/5/16 Alejandro Revilla <a...@jpos.org>
Reply all
Reply to author
Forward
0 new messages