programing

Azure 큐 저장소에서 개체 메시지 전달

starjava 2023. 5. 17. 22:19
반응형

Azure 큐 저장소에서 개체 메시지 전달

저는 애저 큐에 물체를 전달하는 방법을 찾고 있습니다.저는 이것을 할 방법을 찾을 수 없었습니다.

제가 본 바와 같이 문자열이나 바이트 배열을 전달할 수 있는데, 전달하는 객체에는 그다지 편안하지 않습니다.

사용자 지정 개체를 큐에 전달할 방법이 있습니까?

감사합니다!

다음 클래스를 예로 사용할 수 있습니다.

 [Serializable]
    public abstract class BaseMessage
    {
        public byte[] ToBinary()
        {
            BinaryFormatter bf = new BinaryFormatter();
            byte[] output = null;
            using (MemoryStream ms = new MemoryStream())
            {
                ms.Position = 0;
                bf.Serialize(ms, this);
                output = ms.GetBuffer();
            }
            return output;
        }

        public static T FromMessage<T>(CloudQueueMessage m)
        {
            byte[] buffer = m.AsBytes;
            T returnValue = default(T);
            using (MemoryStream ms = new MemoryStream(buffer))
            {
                ms.Position = 0;
                BinaryFormatter bf = new BinaryFormatter();
                returnValue = (T)bf.Deserialize(ms);
            }
            return returnValue;
        }
    }

그런 다음 StdQueue(강력하게 입력된 큐):

   public class StdQueue<T> where T : BaseMessage, new()
    {
        protected CloudQueue queue;

        public StdQueue(CloudQueue queue)
        {
            this.queue = queue;
        }

        public void AddMessage(T message)
        {
            CloudQueueMessage msg =
            new CloudQueueMessage(message.ToBinary());
            queue.AddMessage(msg);
        }

        public void DeleteMessage(CloudQueueMessage msg)
        {
            queue.DeleteMessage(msg);
        }

        public CloudQueueMessage GetMessage()
        {
            return queue.GetMessage(TimeSpan.FromSeconds(120));
        }
    }

그런 다음 기본 메시지를 상속하기만 하면 됩니다.

[Serializable]
public class ParseTaskMessage : BaseMessage
{
    public Guid TaskId { get; set; }

    public string BlobReferenceString { get; set; }

    public DateTime TimeRequested { get; set; }
}

메시지에 사용할 수 있는 대기열을 만듭니다.

CloudStorageAccount acc;
            if (!CloudStorageAccount.TryParse(connectionString, out acc))
            {
                throw new ArgumentOutOfRangeException("connectionString", "Invalid connection string was introduced!");
            }
            CloudQueueClient clnt = acc.CreateCloudQueueClient();
            CloudQueue queue = clnt.GetQueueReference(processQueue);
            queue.CreateIfNotExist();
            this._queue = new StdQueue<ParseTaskMessage>(queue);

이것이 도움이 되길 바랍니다!

뉴턴소프트를 사용하는 확장 방법입니다.제이손과 비동기

    public static async Task AddMessageAsJsonAsync<T>(this CloudQueue cloudQueue, T objectToAdd)
    {
        var messageAsJson = JsonConvert.SerializeObject(objectToAdd);
        var cloudQueueMessage = new CloudQueueMessage(messageAsJson);
        await cloudQueue.AddMessageAsync(cloudQueueMessage);
    }

저는 이 일반화 접근 방식을 좋아하지만 메시지에 넣을 모든 클래스에 직렬화 특성을 적용하고 기본 클래스에서 파생해야 하는 것을 좋아하지 않습니다(저도 이미 기본 클래스가 있을 수 있습니다). 그래서...

using System;
using System.Text;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;

namespace Example.Queue
{
    public static class CloudQueueMessageExtensions
    {
        public static CloudQueueMessage Serialize(Object o)
        {
            var stringBuilder = new StringBuilder();
            stringBuilder.Append(o.GetType().FullName);
            stringBuilder.Append(':');
            stringBuilder.Append(JsonConvert.SerializeObject(o));
            return new CloudQueueMessage(stringBuilder.ToString());
        }

        public static T Deserialize<T>(this CloudQueueMessage m)
        {
            int indexOf = m.AsString.IndexOf(':');

            if (indexOf <= 0)
                throw new Exception(string.Format("Cannot deserialize into object of type {0}", 
                    typeof (T).FullName));

            string typeName = m.AsString.Substring(0, indexOf);
            string json = m.AsString.Substring(indexOf + 1);

            if (typeName != typeof (T).FullName)
            {
                throw new Exception(string.Format("Cannot deserialize object of type {0} into one of type {1}", 
                    typeName,
                    typeof (T).FullName));
            }

            return JsonConvert.DeserializeObject<T>(json);
        }
    }
}

예.

var myobject = new MyObject();
_queue.AddMessage( CloudQueueMessageExtensions.Serialize(myobject));

var myobject = _queue.GetMessage().Deserialize<MyObject>();

저장소 대기열이 WebJob 또는 Azure 함수와 함께 사용되는 경우(일반적인 시나리오) 현재 Azure SDK에서는 POCO 개체를 직접 사용할 수 있습니다.다음 예를 참조하십시오.

참고: SDK는 Newtonsoft를 자동으로 사용합니다.후드 아래 직렬화/직렬화를 위한 Json.

저는 @Akodo_Shado의 연재 방식이 마음에 들었습니다.Newtonsoft.Json업데이트한 날짜는 업데이트했습니다.Azure.Storage.Queues큐에서 개체를 역직렬화하는 "검색 및 삭제" 메서드도 추가했습니다.

public static class CloudQueueExtensions
{
    public static async Task AddMessageAsJsonAsync<T>(this QueueClient queueClient, T objectToAdd) where T : class
    {
        string messageAsJson = JsonConvert.SerializeObject(objectToAdd);
        BinaryData cloudQueueMessage = new BinaryData(messageAsJson);
        await queueClient.SendMessageAsync(cloudQueueMessage);
    }

    public static async Task<T> RetreiveAndDeleteMessageAsObjectAsync<T>(this QueueClient queueClient) where T : class
    {

        QueueMessage[] retrievedMessage = await queueClient.ReceiveMessagesAsync(1);
        if (retrievedMessage.Length == 0) return null;
        string theMessage = retrievedMessage[0].MessageText;
        T instanceOfT = JsonConvert.DeserializeObject<T>(theMessage);
        await queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);

        return instanceOfT;
    }
}

RetreiveAndDeleteMessageAsObjectAsync한 번에 하나의 메시지를 처리하도록 설계되었지만, 당신은 분명히 메시지의 전체 배열을 역직렬화하고 반환하기 위해 다시 쓸 수 있습니다.ICollection<T>또는 그와 유사합니다.

그것은 올바른 방법이 아닙니다.큐는 개체를 저장하기 위한 것이 아닙니다.객체를 블롭 또는 테이블에 넣어야 합니다(비활성화됨).큐 메세게 본체는 sdk1.5로 64kb, 버전이 낮은 8kb 크기 제한이 있는 것 같습니다.Messgae body는 그것을 집는 작업자만을 위해 중요한 데이터를 전송하는 것입니다.

언급URL : https://stackoverflow.com/questions/8550702/passing-object-messages-in-azure-queue-storage

반응형