I'm using Mule 3.x Community Edition. I want to handle connection failure (JDBC, FTP, ...). For instance, I take files from FTP and insert records into database. The problem occurs when during transaction, I have a JDBC connection failure. The files are already taken from FTP (so are deleted), and how to save them anywhere and automatically retransmit them when the connection is restored?
Thx,
Stevy
---------------------------------------------------------------------
To unsubscribe from this list, please visit:
http://xircles.codehaus.org/manage_email
Best regards.
**
Stevy
Edited by: a a on 5 mars 2011 17:52
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>${activemq.version}</version>
</dependency>
Thx,
**
Stevy
I saw the following phrases on this page http://www.mulesoft.org/mule-activemq-integration-examples that you advised me : "Thanks to Mule ESB’s native support for Apache ActiveMQ and the capacity to transparently use Spring for advanced configuration needs" & "Mule, and its embedded ActiveMQ". Does it mean that Active MQ is pre-bundled to Mule??? Or do I need to add Active MQ libraries to Mule?? (Please, can you quickly describe me all the process if I need to add external Active MQ libraries to Mule?)
Thx for your help,
**
Stevyy
I've tried to use the configuration on : http://www.mulesoft.org/mule-activemq-integration-examples .
<spring:beans>
<spring:bean id="AmqDeadLetterStrategy"
class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy"
p:queuePrefix="DLQ." />
<spring:bean id="AmqDefaultPolicyEntry"
class="org.apache.activemq.broker.region.policy.PolicyEntry"
p:queue="jobs.*"
p:deadLetterStrategy-ref="AmqDeadLetterStrategy" />
<spring:bean id="AmqPolicyMap"
class="org.apache.activemq.broker.region.policy.PolicyMap"
p:defaultEntry-ref="AmqDefaultPolicyEntry" />
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
p:brokerName="esb-amq-broker"
p:persistent="true"
p:dataDirectory="${activemq.data.dir}"
p:useJmx="true"
p:useShutdownHook="false"
p:destinationPolicy-ref="AmqPolicyMap"
init-method="start"
destroy-method="stop" />
<spring:bean name="AmqRedeliveryPolicy"
class="org.apache.activemq.RedeliveryPolicy"
p:useExponentialBackOff="true" />
<spring:bean name="AmqConnectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory"
p:brokerURL="vm://esb-amq-broker"
p:redeliveryPolicy-ref="AmqRedeliveryPolicy"
depends-on="AmqBroker" />
</spring:beans>
<jms:activemq-connector name="EsbJmsConnector"
specification="1.1"
connectionFactory-ref="AmqConnectionFactory"
disableTemporaryReplyToDestinations="true"
persistentDelivery="true">
</jms:activemq-connector>
**
I got an error:
**
ERROR 2011-03-06 20:36:52,390 [main] org.mule.config.spring.SpringXmlConfigurationBuilder: Configuration with "org.mule.config.spring.SpringXmlConfigurationBuilder" failed.
org.mule.api.lifecycle.InitialisationException: Line 50 in XML document from URL [file:/D:/workspace_eclipse/ActiveMQ_Test/conf/mule-config.xml] is invalid; nested exception is org.xml.sax.SAXParseException: The prefix "p" for attribute "p:queuePrefix" associated with an element type "spring:bean" is not bound.
at org.mule.registry.AbstractRegistry.initialise(AbstractRegistry.java:115)
at org.mule.config.spring.SpringXmlConfigurationBuilder.createSpringRegistry(SpringXmlConfigurationBuilder.java:116)
at org.mule.config.spring.SpringXmlConfigurationBuilder.doConfigure(SpringXmlConfigurationBuilder.java:73)
at org.mule.config.builders.AbstractConfigurationBuilder.configure(AbstractConfigurationBuilder.java:47)
at org.mule.config.builders.AbstractResourceConfigurationBuilder.configure(AbstractResourceConfigurationBuilder.java:78)
at org.mule.config.builders.AutoConfigurationBuilder.autoConfigure(AutoConfigurationBuilder.java:101)
at org.mule.config.builders.AutoConfigurationBuilder.doConfigure(AutoConfigurationBuilder.java:57)
at org.mule.config.builders.AbstractConfigurationBuilder.configure(AbstractConfigurationBuilder.java:47)
at org.mule.config.builders.AbstractResourceConfigurationBuilder.configure(AbstractResourceConfigurationBuilder.java:78)
at org.mule.context.DefaultMuleContextFactory.createMuleContext(DefaultMuleContextFactory.java:79)
at org.mule.MuleServer.initialize(MuleServer.java:377)
at org.mule.MuleServer.run(MuleServer.java:267)
at org.mule.MuleServer.start(MuleServer.java:254)
at org.mule.MuleServer.main(MuleServer.java:127)
Caused by: org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 50 in XML document from URL [file:/D:/workspace_eclipse/ActiveMQ_Test/conf/mule-config.xml] is invalid; nested exception is org.xml.sax.SAXParseException: The prefix "p" for attribute "p:queuePrefix" associated with an element type "spring:bean" is not bound.
at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:396)
at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:334)
at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:302)
at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:143)
at org.mule.config.spring.MuleApplicationContext.loadBeanDefinitions(MuleApplicationContext.java:107)
at org.springframework.context.support.AbstractRefreshableApplicationContext.refreshBeanFactory(AbstractRefreshableApplicationContext.java:130)
at org.springframework.context.support.AbstractApplicationContext.obtainFreshBeanFactory(AbstractApplicationContext.java:467)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:397)
at org.mule.config.spring.SpringRegistry.doInitialise(SpringRegistry.java:100)
at org.mule.registry.AbstractRegistry.initialise(AbstractRegistry.java:107)
... 13 more
Caused by: org.xml.sax.SAXParseException: The prefix "p" for attribute "p:queuePrefix" associated with an element type "spring:bean" is not bound.
at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.createSAXParseException(ErrorHandlerWrapper.java:195)
at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.fatalError(ErrorHandlerWrapper.java:174)
at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:388)
at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:318)
at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.scanStartElement(XMLNSDocumentScannerImpl.java:334)
at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2755)
at com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:648)
at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.next(XMLNSDocumentScannerImpl.java:140)
at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:511)
at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:808)
at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:737)
at com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:119)
at com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:235)
at com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:284)
at org.springframework.beans.factory.xml.DefaultDocumentLoader.loadDocument(DefaultDocumentLoader.java:75)
at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:388)
... 22 more
**
How can I fix it?
**
Stevy
Euh, jms can't process HashMap ??? See the output below... (I have a transformer on my inbound-endpoint which returns a List<HashMap>, and I want to put values on JMS outbound-endpoint)
ERROR 2011-03-06 21:29:05,792 [EsbJmsConnector.dispatcher.1] org.mule.exception.DefaultServiceExceptionStrategy:
********************************************************************************
Message : Invalid type passed to StreamMessage: HashMap . Allowed types are: Boolean, Byte, Short, Character, Integer, Long, Float, Double,String and byte[] (javax.jms.MessageFormatException)
Code : MULE_ERROR-64999
--------------------------------------------------------------------------------
Exception stack is:
1. Invalid type passed to StreamMessage: HashMap . Allowed types are: Boolean, Byte, Short, Character, Integer, Long, Float, Double,String and byte[](JMS Code: null) (javax.jms.MessageFormatException)
org.mule.transport.jms.JmsMessageUtils:211 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/MessageFormatException.html)
2. Invalid type passed to StreamMessage: HashMap . Allowed types are: Boolean, Byte, Short, Character, Integer, Long, Float, Double,String and byte[] (javax.jms.MessageFormatException) (org.mule.api.transformer.TransformerException)
org.mule.transport.jms.transformers.AbstractJmsTransformer:75 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
javax.jms.MessageFormatException: Invalid type passed to StreamMessage: HashMap . Allowed types are: Boolean, Byte, Short, Character, Integer, Long, Float, Double,String and byte[]
at org.mule.transport.jms.JmsMessageUtils.listToMessage(JmsMessageUtils.java:211)
at org.mule.transport.jms.JmsMessageUtils.toMessage(JmsMessageUtils.java:132)
at org.mule.transport.jms.transformers.AbstractJmsTransformer.transformToMessage(AbstractJmsTransformer.java:67)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
Soon,
**
Stevy
**
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ftp="http://www.mulesoft.org/schema/mule/ftp" xmlns:jdbc="http://www.mulesoft.org/schema/mule/jdbc"
xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio" xmlns:tcp="http://www.mulesoft.org/schema/mule/tcp"
xmlns:spring="http://www.springframework.org/schema/beans" xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/ftp http://www.mulesoft.org/schema/mule/ftp/3.1/mule-ftp.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/3.1/mule-file.xsd
http://www.mulesoft.org/schema/mule/jdbc http://www.mulesoft.org/schema/mule/jdbc/3.1/mule-jdbc.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.1/mule.xsd
http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/3.1/mule-stdio.xsd
http://www.mulesoft.org/schema/mule/tcp http://www.mulesoft.org/schema/mule/tcp/3.1/mule-tcp.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.1/mule-vm.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/3.1/mule-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<!--
FTP global properties
-->
<global-property name="ftp.host" value="localhost" />
<global-property name="ftp.username" value="root" />
<global-property name="ftp.password" value="pwd" />
This data source is used to connect to the database
-->
<spring:bean id="jdbcDataSource"
class="org.enhydra.jdbc.standard.StandardDataSource" destroy-method="shutdown">
<spring:property name="driverName" value="com.mysql.jdbc.Driver" />
<spring:property name="url"
value="jdbc:mysql://localhost:3306/db" />
<spring:property name="user" value="root" />
<spring:property name="password" value="pwd" />
</spring:bean>
<spring:beans>
<spring:bean id="AmqDeadLetterStrategy"
class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy"
p:queuePrefix="DLQ." />
<spring:bean id="AmqDefaultPolicyEntry"
class="org.apache.activemq.broker.region.policy.PolicyEntry" p:queue="jobs.*"
p:deadLetterStrategy-ref="AmqDeadLetterStrategy" />
<spring:bean id="AmqPolicyMap"
class="org.apache.activemq.broker.region.policy.PolicyMap"
p:defaultEntry-ref="AmqDefaultPolicyEntry" />
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService" p:brokerName="esb-amq-broker"
p:persistent="true" p:dataDirectory="${activemq.data.dir}" p:useJmx="true"
p:useShutdownHook="false" p:destinationPolicy-ref="AmqPolicyMap"
init-method="start" destroy-method="stop" />
<spring:bean name="AmqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"
p:useExponentialBackOff="true" />
<spring:bean name="AmqConnectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory"
p:brokerURL="vm://esb-amq-broker" p:redeliveryPolicy-ref="AmqRedeliveryPolicy"
depends-on="AmqBroker" />
</spring:beans>
<jms:activemq-connector name="EsbJmsConnector" maxRedelivery="1"
specification="1.1" connectionFactory-ref="AmqConnectionFactory"
disableTemporaryReplyToDestinations="true" persistentDelivery="true">
</jms:activemq-connector>
<!--
This JDBC connector specifies the queries to run and what data source
to use.
-->
<jdbc:connector name="jdbcConnector" dataSource-ref="jdbcDataSource"
validateConnections="true">
<jdbc:query key="insertCDRIntoDB"
value="insert into cdr
(Test1, Test2, Test3)
values
(#[map-payload:Test1], #[map-payload:Test2], #[map-payload:Test3])" />
</jdbc:connector>
<ftp:connector name="FTPconnector" passive="false"
pollingFrequency="1000" validateConnections="true">
</ftp:connector>
<!--
This custom transformer will take CSV data and convert it to a map,
based on the format provided in a mapping file.
-->
<custom-transformer name="CSV2Maps"
class="com.mvno.muleesb.transformer.My_csv_to_maps_transformer" />
<model>
<service name="CSVLoaderAndInsertToDB">
<description>This flow takes CSV files from a FTP server,
parses them
and inserts them into the database.
</description>
<inbound>
<ftp:inbound-endpoint host="localhost" user="asr"
password="pwd" passive="false" port="21" path="/Path"
connector-ref="FTPconnector">
<file:filename-wildcard-filter
pattern="*.csv" />
</ftp:inbound-endpoint>
</inbound>
<outbound>
<pass-through-router>
<jms:outbound-endpoint queue="jobs.essai"
connector-ref="EsbJmsConnector" >
<jms:transaction action="ALWAYS_BEGIN"/>
<!--<xa-transaction action="ALWAYS_BEGIN"/>-->
</jms:outbound-endpoint>
</pass-through-router>
</outbound>
</service>
<service name="CSVLoaderAndInsertToDB_2">
<description>This flow takes CSV files from a FTP server,
parses them
and inserts them into the database.
</description>
<inbound>
<jms:inbound-endpoint queue="jobs.essai"
connector-ref="EsbJmsConnector" transformer-refs="CSV2Maps" >
<jms:transaction action="JOIN_IF_POSSIBLE"/>
<!--<jdbc:transaction action="ALWAYS_BEGIN" />-->
<!--<xa-transaction action="JOIN_IF_POSSIBLE"/>-->
</jms:inbound-endpoint>
</inbound>
<outbound>
<list-message-splitter-router>
<jdbc:outbound-endpoint connector-ref="jdbcConnector"
queryKey="insertCDRIntoDB">
<payload-type-filter expectedType="java.util.Map" />
<jdbc:transaction action="BEGIN_OR_JOIN" />
<!--<jms:transaction action="JOIN_IF_POSSIBLE"/>-->
</jdbc:outbound-endpoint>
<payload-type-filter expectedType="java.util.List" />
</list-message-splitter-router>
</outbound>
<!--<default-exception-strategy>
<outbound-endpoint
address="stdio://OUT" />
<vm:outbound-endpoint address="vm://esb-amq-broker"
connector-ref="vmConnector" />
</default-exception-strategy>-->
</service>
</model>
</mule>
**
How can modify this configure so as to do the inserts if the database services restart ?
Thx,
"Why do you need a retry policy? " Isn't you who advise to use redelivery policy? Or did I misunderstand you??
Thx for your replies,
**
Property Name Type Description
backOffMultiplier short
collisionAvoidancePercent short
initialRedeliveryDelay long
maximumRedeliveries int
useCollisionAvoidance boolean
useExponentialBackOff boolean
**
Thx,
I have ActiveMQ 5.4.2... I try your config. I think it's "redeliveryDelay" instead of "deliveryDelay"...
But no re-deliveries :(
**
</service>
</model>
</mule>
**
**