Thursday, 18 November 2010

Re-Enqueuing Oracle AQ Message

I recently had an issue with AQ messages which would not de-queue following a wierd Oracle error. The messages would only dequeue in Browse mode and therefore would never get processed or leave the queue.

To resolve this issue and to get my messages to the system that needed to processes them I wrote the following SQL to read the message and re-enqueue it as a new message which was then processed:


DECLARE
queueopts DBMS_AQ.DEQUEUE_OPTIONS_T;
msgprops DBMS_AQ.MESSAGE_PROPERTIES_T;
v_dequeue_options DBMS_AQ.dequeue_options_t;
v_enqueue_options DBMS_AQ.enqueue_options_t;
v_message_properties DBMS_AQ.message_properties_t;
v_message_prop_en DBMS_AQ.message_properties_t;
v_message_handle RAW(16);
v_message sys.aq$_jms_text_message ;
v_message_provider_id number(8,2);
v_message_provider varchar2(50);
v_alert_msg_id char(36);
v_alert_msg_type varchar2(50);
v_num_attach number(3);
v_alert_msg clob;
v_alert_msg_id_in char(36);
v_alert_msg_id_deq char(36);
v_corrmsg_id_in char(36);
V_QUEUE_MSG_NAME VARCHAR2(50);
V_QUEUE_MSG_ID NUMBER(8,2);
V_MESSAGE_EN SYS.AQ$_JMS_TEXT_MESSAGE;
V_ENQUEUE_OPTIONS_EN DBMS_AQ.ENQUEUE_OPTIONS_T;
V_MSGID RAW(16);
V_MESSAGE_SIZE NUMBER(5);
BEGIN
v_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
v_dequeue_options.wait := 2; --DBMS_AQ.NO_WAIT;
v_dequeue_options.correlation := v_alert_msg_id_deq ;
v_dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
-- v_dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
v_dequeue_options.msgid := '93E04404404463F7E04400144FDCA7C6';

V_QUEUE_MSG_NAME := 'MYAPP.MYAPP_WORK_IN';
v_alert_msg_id := '5bc0352d-4578-4578-b938-bd457841763b';
v_corrmsg_id_in := '9af66884-d74c-af66-af66-479be7e7d772';

-- Dequeue message from the queue
DBMS_AQ.DEQUEUE(queue_name => V_QUEUE_MSG_NAME,
dequeue_options => v_dequeue_options,
message_properties => v_message_properties,
payload => v_message,
msgid => v_message_handle);

V_MESSAGE.GET_TEXT(v_alert_msg);

/* Now display some of the information. */
DBMS_OUTPUT.PUT_LINE('Dequeued msg id is ' || RAWTOHEX (v_message_handle));
DBMS_OUTPUT.PUT_LINE('MsgId: ' || v_message.get_string_property('msgId'));
DBMS_OUTPUT.PUT_LINE('MsgType: ' || v_message.get_string_property('msgType'));
DBMS_OUTPUT.PUT_LINE('numTags: ' || v_message.get_int_property('numTags'));

-- Output Message
DBMS_OUTPUT.PUT_LINE(v_alert_msg);

-- Create a new Message
V_MESSAGE_EN := SYS.AQ$_JMS_TEXT_MESSAGE.CONSTRUCT;
v_message_prop_en.correlation := v_corrmsg_id_in;

-- Setting user defined message properties
V_MESSAGE_EN.SET_INT_PROPERTY('numTags', 0);
V_MESSAGE_EN.SET_STRING_PROPERTY('msgType' , v_message.get_string_property('msgType'));
V_MESSAGE_EN.SET_STRING_PROPERTY('msgProvider' , v_message.get_string_property('msgProvider'));
V_MESSAGE_EN.SET_STRING_PROPERTY('msgId' , v_alert_msg_id);

-- Set Message Payload
V_MESSAGE_EN.SET_TEXT(v_alert_msg);

-- Enqueue this message into AQ queue using DBMS_AQ package
DBMS_AQ.ENQUEUE(QUEUE_NAME => V_QUEUE_MSG_NAME,
ENQUEUE_OPTIONS => v_enqueue_options,
MESSAGE_PROPERTIES => v_message_prop_en,
PAYLOAD => V_MESSAGE_EN,
MSGID => v_alert_msg_id);

DBMS_OUTPUT.PUT_LINE('Message ' || v_alert_msg_id || ' sent');
END;

No comments: