Author: arevilla
Date: Fri Aug 3 15:03:41 2007
New Revision: 43
Added:
trunk/opt/saf/
trunk/opt/saf/deploy/
trunk/opt/saf/deploy/20_saf.xml.sample
trunk/opt/saf/deploy/95_saf_monitor.xml.sample
trunk/opt/saf/src/
trunk/opt/saf/src/org/
trunk/opt/saf/src/org/jpos/
trunk/opt/saf/src/org/jpos/saf/
trunk/opt/saf/src/org/jpos/saf/SAF.java
trunk/opt/saf/src/org/jpos/saf/SAFMonitor.java
Log:
Store and Forward service
Added: trunk/opt/saf/deploy/20_saf.xml.sample
=========================================================================== ===
--- (empty file)
+++ trunk/opt/saf/deploy/20_saf.xml.sample Fri Aug 3 15:03:41 2007
@@ -0,0 +1,17 @@
+<saf name='saf' logger='Q2' realm='saf' class='org.jpos.saf.SAF'>
+ <property name='space' value='jdbm:saf' />
+ <property name='mux' value='mux' />
+ <property name='flag-retransmissions' value='no'>
+ if MTI is in list, messages would be retransmitted as xxx1
+ </property>
+ <property name='initial-delay' value='60000' />
+ <property name='inter-message-delay' value='1000' />
+ <property name='wait-for-response' value='60000' />
+ <property name='max-retransmissions' value='1000' />
+ <property name='expire-after' value='86400'>
+ in seconds
+ </property>
+ <property name='valid-response-codes' value='*' />
+ <property name='retry-response-codes' value='91,ZZ' />
+</saf>
+
Added: trunk/opt/saf/deploy/95_saf_monitor.xml.sample
=========================================================================== ===
--- (empty file)
+++ trunk/opt/saf/deploy/95_saf_monitor.xml.sample Fri Aug 3 15:03:41 2007
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<saf-monitor class='org.jpos.ee.status.Monitor' logger='Q2'>
+ <monitor id="SAF" delay='10000' period='30000'>
+ <class>org.jpos.saf.SAFMonitor</class>
+ <property name='saf' value='saf' />
+ </monitor>
+</saf-monitor>
+
Added: trunk/opt/saf/src/org/jpos/saf/SAF.java
=========================================================================== ===
--- (empty file)
+++ trunk/opt/saf/src/org/jpos/saf/SAF.java Fri Aug 3 15:03:41 2007
@@ -0,0 +1,306 @@
+/*
+ * jPOS Extended Edition
+ * Copyright (C) 2004 Alejandro P. Revilla
+ * jPOS.org (http://jpos.org)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+package org.jpos.saf;
+
+import java.io.PrintStream;
+import java.util.Date;
+import java.io.Serializable;
+
+import org.jpos.iso.ISOMsg;
+import org.jpos.iso.ISOException;
+import org.jpos.core.Configuration;
+import org.jpos.core.ConfigurationException;
+import org.jpos.q2.QBeanSupport;
+import org.jpos.space.Space;
+import org.jpos.space.SpaceUtil;
+import org.jpos.space.JDBMSpace;
+import org.jpos.space.SpaceFactory;
+import org.jpos.iso.MUX;
+import org.jpos.iso.ISOUtil;
+import org.jpos.util.Logger;
+import org.jpos.util.LogEvent;
+import org.jpos.util.Loggeable;
+import org.jpos.util.NameRegistrar;
+
+public class SAF extends QBeanSupport implements Runnable, Loggeable {
+ Space psp;
+ MUX mux;
+ long initialDelay;
+ long interMessageDelay;
+ long waitForResponse;
+ int maxRetransmissions;
+ long expiration;
+ String flagRetransmissions;
+ String validResponseCodes;
+ String retryResponseCodes;
+ String queue;
+ String head;
+
+ public void initService() {
+ queue = getName();
+ head = getName() + ".head";
+ NameRegistrar.register (getName(), this);
+ }
+ public void startService() {
+ // we re-register just in case the component was soft-stopped
+ NameRegistrar.register (getName(), this);
+ new Thread(this).start();
+ }
+ public void stopService() {
+ NameRegistrar.unregister (getName());
+ }
+ public void run() {
+ Thread.currentThread().setName (getName());
+ while (running()) {
+ if (!mux.isConnected()) {
+ ISOUtil.sleep (1000);
+ continue;
+ }
+ try {
+ if (latchMsg()) {
+ Entry entry = (Entry) psp.rdp (head);
+ entry = send (entry);
+ synchronized (psp) {
+ autoCommitOff ();
+ psp.inp (head);
+ if (entry != null)
+ psp.out (head, entry);
+ autoCommitOn ();
+ }
+ if (interMessageDelay > 0)
+ ISOUtil.sleep (interMessageDelay);
+ }
+ } catch (Exception e) {
+ getLog().error (e);
+ ISOUtil.sleep (15000L); // "easy" on exceptions
+ }
+ }
+ }
+ public void setConfiguration (Configuration cfg)
+ throws ConfigurationException
+ {
+ super.setConfiguration (cfg);
+ psp = SpaceFactory.getSpace (cfg.get ("space"));
+ String muxName = "mux." + cfg.get ("mux");
+ try {
+ mux = (MUX) NameRegistrar.get (muxName);
+ } catch (NameRegistrar.NotFoundException e) {
+ throw new ConfigurationException (
+ "MUX '" + muxName + "' not found in registrar."
+ );
+ }
+ flagRetransmissions = cfg.get ("flag-retransmissions", "");
+ initialDelay = cfg.getLong ("initial-delay", 30000L);
+ interMessageDelay = cfg.getLong ("inter-message-delay", 1000L);
+ waitForResponse = cfg.getLong ("wait-for-response", 60000L);
+ maxRetransmissions = cfg.getInt ("max-retransmissions", 0);
+ expiration = cfg.getLong ("expire-after", 0L) * 1000L;
+ validResponseCodes = cfg.get ("valid-response-codes", "00");
+ retryResponseCodes = cfg.get ("retry-response-codes", null);
+ }
+ public static SAF getSAF (String name) throws NameRegistrar.NotFoundException {
+ return (SAF) NameRegistrar.get (name);
+ }
+ public Space getSpace() {
+ return psp;
+ }
+ /**
+ * queue message for transmission
+ * @param msg message to send
+ */
+ public void send (ISOMsg msg) {
+ send (msg, null, 0L, false);
+ }
+ /**
+ * queue message for transmission
+ * @param msg message to send
+ * @param responseKey if not null, on response, put the response message on the space
+ * @param responseTimeout optional timeout for response message
+ * @param wipe if true and responseKey is not null, SAF would wipe previous entries
+ */
+ public void send (ISOMsg msg, String responseKey, long responseTimeout, boolean wipe) {
+ psp.out (queue, new Entry(msg, responseKey, responseTimeout, wipe));
+ }
+ public void dump (PrintStream p, String indent) {
+ String inner = indent + " ";
+ p.println (indent + "<saf name='" + getName() + "'>");
+ p.println (inner + getStatus());
+ p.println (indent + "</saf>");
+ }
+ public String getStatus() {
+ StringBuffer sb = new StringBuffer();
+ sb.append ("mux=");
+ sb.append (mux.isConnected() ? "ready" : "not-ready");
+ if (psp instanceof JDBMSpace) {
+ sb.append (", queue-size=");
+ sb.append (((JDBMSpace)psp).size (queue));
+ }
+ Entry latched = (Entry) psp.rdp (head);
+ if (latched != null) {
+ if (latched.count > 0)
+ sb.append (" head transmitted " + latched.count + " time(s)");
+ }
+ return sb.toString();
+ }
+ private boolean latchMsg() {
+ Entry entry = (Entry) psp.rdp (head);
+ if (entry == null) {
+ entry = (Entry) psp.rd (queue, 5000L);
+ if (entry != null) {
+ synchronized (psp) {
+ autoCommitOff ();
+ psp.out (head, psp.in (queue));
+ autoCommitOn ();
+ }
+ }
+ }
+ return entry != null;
+ }
+ private void autoCommitOn () {
+ if (psp instanceof JDBMSpace)
+ ((JDBMSpace)psp).setAutoCommit(true);
+ }
+ private void autoCommitOff () {
+ if (psp instanceof JDBMSpace)
+ ((JDBMSpace)psp).setAutoCommit(false);
+ }
+ private Entry send (Entry entry) {
+ if (shouldIgnore(entry)) {
+ LogEvent evt = getLog().createLogEvent("saf-warning");
+ if (isMaxRetransmission (entry))
+ evt.addMessage ("max retransmission count ("+maxRetransmissions+") has been reached.");
+ if (isExpired (entry)) {
+ Date d = new Date (entry.time);
+ evt.addMessage ("message queued on " + d.toString() + " is now expired.");
+ }
+ evt.addMessage (entry.msg);
+ Logger.log (evt);
+ return null;
+ }
+ try {
+ ISOMsg resp = mux.request (entry.msg, waitForResponse);
+ if (resp == null) {
+ // we don't count if we don't get a response, the request
+ // will expire at its expiration time
+ return entry;
+ }
+ if (resp.hasField (39)) {
+ String rc = resp.getString(39);
+ if (retryResponseCodes != null && retryResponseCodes.indexOf(rc) >= 0) {
+ // this result code requires retransmission, so we don't increase
+ // the retransmission counter. The request may expire though
+ LogEvent evt = createLogEvent ("info", entry, resp);
+ evt.addMessage ("response code '"
+ + resp.getString(39)
+ + "' is in retry-response-codes list ("
+ + retryResponseCodes + ")");
+ Logger.log (evt);
+ return entry;
+ }
+ entry.count++;
+ if ("*".equals (validResponseCodes) || validResponseCodes.indexOf(rc) >= 0) {
+ LogEvent evt = createLogEvent ("info", entry, resp);
+ evt.addMessage ("response code '"
+ + resp.getString(39)
+ + "' is in valid-response-codes list ("
+ + validResponseCodes + ")");
+ Logger.log (evt);
+ // GOOD - Message was sent
+ if (entry.responseKey != null) {
+ if (entry.wipePreviousResponse)
+ SpaceUtil.wipe (psp, entry.responseKey);
+ if (entry.responseTimeout > 0L)
+ psp.out (entry.responseKey, resp, entry.responseTimeout);
+ else
+ psp.out (entry.responseKey, resp);
+ }
+ return null;
+ } else {
+ LogEvent evt = createLogEvent ("info", entry, resp);
+ evt.addMessage ("response code '"
+ + resp.getString(39)
+ + "' not in valid-response-codes list ("
+ + validResponseCodes + ")");
+ Logger.log (evt);
+ }
+ }
+ if (entry.count == 1 && flagRetransmissions.indexOf (entry.msg.getMTI()) >= 0)
+ entry.msg.setRetransmissionMTI(); // we'll send a retransmission on next try
+ } catch (ISOException e) {
+ LogEvent evt = createLogEvent ("error", entry);
+ evt.addMessage ("Error " + e.toString() + " while sending message.");
+ evt.addMessage ("--- stack trace ---");
+ evt.addMessage (e);
+ Logger.log (evt);
+ }
+ return entry;
+ }
+ private LogEvent createLogEvent (String type, Entry entry, ISOMsg resp) {
+ LogEvent evt = getLog().createLogEvent(type);
+ evt.addMessage (" Message timestamp: " + new Date(entry.time));
+ evt.addMessage ("Transmission count: " + entry.count);
+ evt.addMessage ("--- request ---");
+ evt.addMessage (entry.msg);
+ if (resp != null) {
+ evt.addMessage ("--- response ---");
+ evt.addMessage (resp);
+ }
+ return evt;
+ }
+ private LogEvent createLogEvent (String type, Entry entry) {
+ return createLogEvent (type, entry, null);
+ }
+ private boolean isExpired (Entry e) {
+ return expiration == 0 ? false :
+ System.currentTimeMillis() > (e.time + expiration);
+ }
+ private boolean isMaxRetransmission(Entry e) {
+ return maxRetransmissions == 0 ? false :
+ e.count > maxRetransmissions;
+ }
+ private boolean shouldIgnore(Entry e) {
+ return isExpired(e) || isMaxRetransmission(e);
+ }
+ public static class Entry implements Serializable {
+ public static final long serialVersionUID = 1L;
+ public ISOMsg msg;
+ public long time;
+ public int count;
+ public String responseKey;
+ public long responseTimeout;
+ public boolean wipePreviousResponse;
+ public Entry () {
+ super();
+ this.time = System.currentTimeMillis();
+ }
+ public Entry (ISOMsg msg) {
+ this (msg, null, 0L, false);
+ }
+ public Entry (ISOMsg msg, String responseKey, long responseTimeout, boolean wipePreviousResponse) {
+ this();
+ this.msg = msg;
+ this.responseKey = responseKey;
+ this.responseTimeout = responseTimeout;
+ this.wipePreviousResponse = wipePreviousResponse;
+ }
+ }
+}
+
Added: trunk/opt/saf/src/org/jpos/saf/SAFMonitor.java
=========================================================================== ===
--- (empty file)
+++ trunk/opt/saf/src/org/jpos/saf/SAFMonitor.java Fri Aug 3 15:03:41 2007
@@ -0,0 +1,55 @@
+/*
+ * jPOS Extended Edition
+ * Copyright (C) 2005 Alejandro P. Revilla
+ * jPOS.org (http://jpos.org)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+package org.jpos.saf;
+
+import java.net.Socket;
+import java.io.IOException;
+import org.jpos.core.Configurable;
+import org.jpos.core.Configuration;
+import org.jpos.core.ConfigurationException;
+import org.jpos.util.Log;
+import org.jpos.util.NameRegistrar;
+import org.jpos.ee.status.Status;
+import org.jpos.ee.status.MonitorTask;
+
+public class SAFMonitor extends Log implements MonitorTask, Configurable {
+ Configuration cfg;
+ String safName;
+ public String checkService () {
+ boolean rc = false;
+ SAF saf;
+ try {
+ saf = SAF.getSAF (safName);
+ } catch (NameRegistrar.NotFoundException e) {
+ return Status.ERROR + " saf not found";
+ }
+ return Status.OK + " " + saf.getStatus();
+ }
+ public void setConfiguration (Configuration cfg)
+ throws ConfigurationException
+ {
+ this.cfg = cfg;
+ safName = cfg.get ("saf", null);
+ if (safName == null)
+ throw new ConfigurationException ("property 'saf' is null");
+ }
+}
+