Tuesday, 29 March 2011

Setting up Active MQ to Bridge to Oracle AQ

I've recently been working on migrating away from Oracle Advanced Queues to Active MQ and as such I have tried to opt for a gradual migration strategy. As part of this I have looked into creating a JMS Bridge which is supprisingly easy to set up in Active MQ.

This post shows how to set up queues and routing for the following tests

A) Send message to AQ which gets Moved to an Active MQ queue

A1 -> A2

B) Send message to AQ, throw exception when dequeing message to push it into the exception queue, then moved to an Active MQ queue

B1 -> B1_E -> B3

C) Send message to MQ which then gets moved to AQ

C1 -> C2
C)

First create the queues under a new user called qtest in oracle


BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_A1_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_A1', Queue_table => 'qtest.TEST_A1_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_A1');
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_B1_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_B1', Queue_table => 'qtest.TEST_B1_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_B1');
/

EXECUTE dbms_aqadm.start_queue(queue_name => 'AQ$_TEST_B1_QT_E', enqueue => FALSE, dequeue => TRUE);
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_C1_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_C1', Queue_table => 'qtest.TEST_C1_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_C1');
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_C2_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_C2', Queue_table => 'qtest.TEST_C2_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_C2');
/

commit;
/


Next edit the Active MQ configuration conf/activemq.xml and at the end of the config before the closing beans tag add the oracle connection and factories.



jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = localhost)(PORT = 1521)))(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = xe)))











qtest


qtest








Finally add the routing in the camel section of Active MQ config

















Thursday, 24 March 2011

Enabling AQ Exception Queue

Had to spend a little while searching for this one so added here for quick reference

By default exception queues are disabled for enqueue and dequeue so to enable them you need to use the start_queue function however Oracle prevents enquing to these queues so you will need to explicitly state that enqueue is disabled/FALSE and de-queue is enabled/TRUE


EXECUTE dbms_aqadm.start_queue(queue_name => 'AQ$_TEST_Q_IN_QT_E', enqueue => FALSE, dequeue => TRUE);
/

commit;
/

Creating an AQ queue

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'MYQUEUES.TEST_Q_IN_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'MYQUEUES.TEST_Q_IN', Queue_table => 'MYQUEUES.TEST_Q_IN_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

commit;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'MYQUEUES.TEST_Q_IN');
/

commit;
/

EXECUTE dbms_aqadm.start_queue(queue_name => 'AQ$_TEST_Q_IN_QT_E', enqueue => FALSE, dequeue => TRUE);
/

commit;
/