Showing posts with label Oracle AQ. Show all posts
Showing posts with label Oracle AQ. Show all posts

Tuesday, 29 March 2011

Setting up Active MQ to Bridge to Oracle AQ

I've recently been working on migrating away from Oracle Advanced Queues to Active MQ and as such I have tried to opt for a gradual migration strategy. As part of this I have looked into creating a JMS Bridge which is supprisingly easy to set up in Active MQ.

This post shows how to set up queues and routing for the following tests

A) Send message to AQ which gets Moved to an Active MQ queue

A1 -> A2

B) Send message to AQ, throw exception when dequeing message to push it into the exception queue, then moved to an Active MQ queue

B1 -> B1_E -> B3

C) Send message to MQ which then gets moved to AQ

C1 -> C2
C)

First create the queues under a new user called qtest in oracle


BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_A1_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_A1', Queue_table => 'qtest.TEST_A1_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_A1');
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_B1_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_B1', Queue_table => 'qtest.TEST_B1_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_B1');
/

EXECUTE dbms_aqadm.start_queue(queue_name => 'AQ$_TEST_B1_QT_E', enqueue => FALSE, dequeue => TRUE);
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_C1_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_C1', Queue_table => 'qtest.TEST_C1_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_C1');
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'qtest.TEST_C2_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'qtest.TEST_C2', Queue_table => 'qtest.TEST_C2_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'qtest.TEST_C2');
/

commit;
/


Next edit the Active MQ configuration conf/activemq.xml and at the end of the config before the closing beans tag add the oracle connection and factories.



jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = localhost)(PORT = 1521)))(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = xe)))











qtest


qtest








Finally add the routing in the camel section of Active MQ config

















Thursday, 24 March 2011

Enabling AQ Exception Queue

Had to spend a little while searching for this one so added here for quick reference

By default exception queues are disabled for enqueue and dequeue so to enable them you need to use the start_queue function however Oracle prevents enquing to these queues so you will need to explicitly state that enqueue is disabled/FALSE and de-queue is enabled/TRUE


EXECUTE dbms_aqadm.start_queue(queue_name => 'AQ$_TEST_Q_IN_QT_E', enqueue => FALSE, dequeue => TRUE);
/

commit;
/

Creating an AQ queue

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'MYQUEUES.TEST_Q_IN_QT', Queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
Sort_list => 'ENQ_TIME', Compatible => '8.1.3');
END;
/

commit;
/

BEGIN
DBMS_AQADM.CREATE_QUEUE( Queue_name => 'MYQUEUES.TEST_Q_IN', Queue_table => 'MYQUEUES.TEST_Q_IN_QT',
Queue_type => 0, Max_retries => 4, Retry_delay => 60, Retention_time => 220752000, dependency_tracking => FALSE);
END;
/

commit;
/

EXECUTE dbms_aqadm.start_queue (queue_name=>'MYQUEUES.TEST_Q_IN');
/

commit;
/

EXECUTE dbms_aqadm.start_queue(queue_name => 'AQ$_TEST_Q_IN_QT_E', enqueue => FALSE, dequeue => TRUE);
/

commit;
/

Thursday, 18 November 2010

Re-Enqueuing Oracle AQ Message

I recently had an issue with AQ messages which would not de-queue following a wierd Oracle error. The messages would only dequeue in Browse mode and therefore would never get processed or leave the queue.

To resolve this issue and to get my messages to the system that needed to processes them I wrote the following SQL to read the message and re-enqueue it as a new message which was then processed:


DECLARE
queueopts DBMS_AQ.DEQUEUE_OPTIONS_T;
msgprops DBMS_AQ.MESSAGE_PROPERTIES_T;
v_dequeue_options DBMS_AQ.dequeue_options_t;
v_enqueue_options DBMS_AQ.enqueue_options_t;
v_message_properties DBMS_AQ.message_properties_t;
v_message_prop_en DBMS_AQ.message_properties_t;
v_message_handle RAW(16);
v_message sys.aq$_jms_text_message ;
v_message_provider_id number(8,2);
v_message_provider varchar2(50);
v_alert_msg_id char(36);
v_alert_msg_type varchar2(50);
v_num_attach number(3);
v_alert_msg clob;
v_alert_msg_id_in char(36);
v_alert_msg_id_deq char(36);
v_corrmsg_id_in char(36);
V_QUEUE_MSG_NAME VARCHAR2(50);
V_QUEUE_MSG_ID NUMBER(8,2);
V_MESSAGE_EN SYS.AQ$_JMS_TEXT_MESSAGE;
V_ENQUEUE_OPTIONS_EN DBMS_AQ.ENQUEUE_OPTIONS_T;
V_MSGID RAW(16);
V_MESSAGE_SIZE NUMBER(5);
BEGIN
v_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
v_dequeue_options.wait := 2; --DBMS_AQ.NO_WAIT;
v_dequeue_options.correlation := v_alert_msg_id_deq ;
v_dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
-- v_dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
v_dequeue_options.msgid := '93E04404404463F7E04400144FDCA7C6';

V_QUEUE_MSG_NAME := 'MYAPP.MYAPP_WORK_IN';
v_alert_msg_id := '5bc0352d-4578-4578-b938-bd457841763b';
v_corrmsg_id_in := '9af66884-d74c-af66-af66-479be7e7d772';

-- Dequeue message from the queue
DBMS_AQ.DEQUEUE(queue_name => V_QUEUE_MSG_NAME,
dequeue_options => v_dequeue_options,
message_properties => v_message_properties,
payload => v_message,
msgid => v_message_handle);

V_MESSAGE.GET_TEXT(v_alert_msg);

/* Now display some of the information. */
DBMS_OUTPUT.PUT_LINE('Dequeued msg id is ' || RAWTOHEX (v_message_handle));
DBMS_OUTPUT.PUT_LINE('MsgId: ' || v_message.get_string_property('msgId'));
DBMS_OUTPUT.PUT_LINE('MsgType: ' || v_message.get_string_property('msgType'));
DBMS_OUTPUT.PUT_LINE('numTags: ' || v_message.get_int_property('numTags'));

-- Output Message
DBMS_OUTPUT.PUT_LINE(v_alert_msg);

-- Create a new Message
V_MESSAGE_EN := SYS.AQ$_JMS_TEXT_MESSAGE.CONSTRUCT;
v_message_prop_en.correlation := v_corrmsg_id_in;

-- Setting user defined message properties
V_MESSAGE_EN.SET_INT_PROPERTY('numTags', 0);
V_MESSAGE_EN.SET_STRING_PROPERTY('msgType' , v_message.get_string_property('msgType'));
V_MESSAGE_EN.SET_STRING_PROPERTY('msgProvider' , v_message.get_string_property('msgProvider'));
V_MESSAGE_EN.SET_STRING_PROPERTY('msgId' , v_alert_msg_id);

-- Set Message Payload
V_MESSAGE_EN.SET_TEXT(v_alert_msg);

-- Enqueue this message into AQ queue using DBMS_AQ package
DBMS_AQ.ENQUEUE(QUEUE_NAME => V_QUEUE_MSG_NAME,
ENQUEUE_OPTIONS => v_enqueue_options,
MESSAGE_PROPERTIES => v_message_prop_en,
PAYLOAD => V_MESSAGE_EN,
MSGID => v_alert_msg_id);

DBMS_OUTPUT.PUT_LINE('Message ' || v_alert_msg_id || ' sent');
END;

Wednesday, 17 February 2010

Moving Exception Messages back onto the normal AQ queue

Just wrote some PL/SQL to restore messages on the exception queue back onto the normal queue.


DECLARE
dequeue_options dbms_aq.dequeue_options_t;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(32);
message SYS.AQ$_JMS_TEXT_MESSAGE;

deq_qname varchar2(50);
enq_qname varchar2(50);

ex_no_messages exception;
ex_dequeue exception;
pragma exception_init (ex_no_messages, -25263);
pragma exception_init (ex_dequeue, -25228);
msg_count number;
BEGIN
deq_qname := 'SCOTT.AQ$_TBL_DATAFEED_REQ_E';
enq_qname := 'SCOTT.DATAFEED_REQ';

dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.dequeue_mode := dbms_aq.remove_nodata;

msg_count := 0;

-- Enabling the exception queue for dequeue
dbms_aqadm.start_queue(deq_qname, false, true);

LOOP --Looping to find and remove the message from exception queue
dbms_aq.dequeue(queue_name => deq_qname,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

dbms_aq.enqueue(queue_name => enq_qname,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload =>message,
msgid => message_handle);

dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;

msg_count := msg_count + 1;

DBMS_OUTPUT.PUT_LINE ('Processed ' || msg_count || ' messages');

dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
END LOOP;

EXCEPTION -- exception handling
WHEN ex_no_messages THEN
DBMS_OUTPUT.PUT_LINE ('No of Messages Moved: '||msg_count);
COMMIT;
WHEN ex_dequeue THEN
DBMS_OUTPUT.PUT_LINE ('No of Messages Moved: '||msg_count);
COMMIT;
END;
/

Extracting data from a JMS Text message

Some useful SQL to extract data from a JMS Text message

SELECT SUBSTR(q.user_data.text_lob, INSTR(q.user_data.text_lob,'') + 20,7) "Special Reference",
SUBSTR(q.user_data.text_lob,
INSTR(q.user_data.text_lob,'') + 19,
(INSTR(q.user_data.text_lob,'
')
- INSTR(q.user_data.text_lob,'') - 19)
) "Contact",
SUBSTR(q.user_data.text_lob,
INSTR(q.user_data.text_lob,'') + 5,
(INSTR(q.user_data.text_lob,'
')
- INSTR(q.user_data.text_lob,'') - 5)
) "REF"
,q.MSGID, q.CORRID, q.ENQ_TIME, q.DEQ_TIME, q.RETRY_COUNT,
CASE WHEN q.Q_NAME = 'AQ$_TBL_XMLMSG_E' THEN 'FAIL' ELSE 'PASS' END as "successful",
q.Q_NAME, length(q.user_data.text_lob) as "MSG Size", q.user_data.text_lob, q.user_data.text_vc
FROM MSGUSER.TBL_XMLMSG q
WHERE q.ENQ_TIME > to_date('12-02-2010 00:00:00', 'dd-mm-yyyy hh24-mi-ss')
AND q.Q_NAME = 'AQ$_TBL_XMLMSG_E'
AND INSTR(q.user_data.text_lob,'Tester') = 0 -- Exclude Test records
ORDER BY q.DEQ_TIME desc, q.ENQ_TIME