To resolve this issue and to get my messages to the system that needed to processes them I wrote the following SQL to read the message and re-enqueue it as a new message which was then processed:
DECLARE
queueopts DBMS_AQ.DEQUEUE_OPTIONS_T;
msgprops DBMS_AQ.MESSAGE_PROPERTIES_T;
v_dequeue_options DBMS_AQ.dequeue_options_t;
v_enqueue_options DBMS_AQ.enqueue_options_t;
v_message_properties DBMS_AQ.message_properties_t;
v_message_prop_en DBMS_AQ.message_properties_t;
v_message_handle RAW(16);
v_message sys.aq$_jms_text_message ;
v_message_provider_id number(8,2);
v_message_provider varchar2(50);
v_alert_msg_id char(36);
v_alert_msg_type varchar2(50);
v_num_attach number(3);
v_alert_msg clob;
v_alert_msg_id_in char(36);
v_alert_msg_id_deq char(36);
v_corrmsg_id_in char(36);
V_QUEUE_MSG_NAME VARCHAR2(50);
V_QUEUE_MSG_ID NUMBER(8,2);
V_MESSAGE_EN SYS.AQ$_JMS_TEXT_MESSAGE;
V_ENQUEUE_OPTIONS_EN DBMS_AQ.ENQUEUE_OPTIONS_T;
V_MSGID RAW(16);
V_MESSAGE_SIZE NUMBER(5);
BEGIN
v_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
v_dequeue_options.wait := 2; --DBMS_AQ.NO_WAIT;
v_dequeue_options.correlation := v_alert_msg_id_deq ;
v_dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
-- v_dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
v_dequeue_options.msgid := '93E04404404463F7E04400144FDCA7C6';
V_QUEUE_MSG_NAME := 'MYAPP.MYAPP_WORK_IN';
v_alert_msg_id := '5bc0352d-4578-4578-b938-bd457841763b';
v_corrmsg_id_in := '9af66884-d74c-af66-af66-479be7e7d772';
-- Dequeue message from the queue
DBMS_AQ.DEQUEUE(queue_name => V_QUEUE_MSG_NAME,
dequeue_options => v_dequeue_options,
message_properties => v_message_properties,
payload => v_message,
msgid => v_message_handle);
V_MESSAGE.GET_TEXT(v_alert_msg);
/* Now display some of the information. */
DBMS_OUTPUT.PUT_LINE('Dequeued msg id is ' || RAWTOHEX (v_message_handle));
DBMS_OUTPUT.PUT_LINE('MsgId: ' || v_message.get_string_property('msgId'));
DBMS_OUTPUT.PUT_LINE('MsgType: ' || v_message.get_string_property('msgType'));
DBMS_OUTPUT.PUT_LINE('numTags: ' || v_message.get_int_property('numTags'));
-- Output Message
DBMS_OUTPUT.PUT_LINE(v_alert_msg);
-- Create a new Message
V_MESSAGE_EN := SYS.AQ$_JMS_TEXT_MESSAGE.CONSTRUCT;
v_message_prop_en.correlation := v_corrmsg_id_in;
-- Setting user defined message properties
V_MESSAGE_EN.SET_INT_PROPERTY('numTags', 0);
V_MESSAGE_EN.SET_STRING_PROPERTY('msgType' , v_message.get_string_property('msgType'));
V_MESSAGE_EN.SET_STRING_PROPERTY('msgProvider' , v_message.get_string_property('msgProvider'));
V_MESSAGE_EN.SET_STRING_PROPERTY('msgId' , v_alert_msg_id);
-- Set Message Payload
V_MESSAGE_EN.SET_TEXT(v_alert_msg);
-- Enqueue this message into AQ queue using DBMS_AQ package
DBMS_AQ.ENQUEUE(QUEUE_NAME => V_QUEUE_MSG_NAME,
ENQUEUE_OPTIONS => v_enqueue_options,
MESSAGE_PROPERTIES => v_message_prop_en,
PAYLOAD => V_MESSAGE_EN,
MSGID => v_alert_msg_id);
DBMS_OUTPUT.PUT_LINE('Message ' || v_alert_msg_id || ' sent');
END;
No comments:
Post a Comment