作者:逝去成回忆2502920253 | 来源:互联网 | 2023-05-17 09:10
最近在使用ApacheActiveMQ消息队列做一些工作,在使用的时候,发现发送消息的内容是中文时,获得消息时,获得的是乱码。发送和接收我使用的是ActiveMQ.NET1.1.0组件。
最近在使用 Apache ActiveMQ 消息队列做一些工作,在使用的时候,发现发送消息的内容是中文时,获得消息时,获得的是乱码。
发送和接收我使用的是 ActiveMQ .NET 1.1.0 组件。
分析原因进去后,发现是这个开源组件的 Apache.NMS.ActiveMQ 组件的 /Apache.NMS.ActiveMQ-1.1.0-src/src/main/csharp/Transport/Stomp/StompWireFormat.cs 文件对应的 StompWireFormat 类的 public Object Unmarshal(BinaryReader dis) 方法中有错误。
错误在该文件的 166 行
之前的错误代码如下:
public Object Unmarshal(BinaryReader dis)
{
string command;
do {
command = ReadLine(dis);
}
while (command == "");
Tracer.Debug("<<<" + command);
IDictionary headers = new Hashtable();
string line;
while ((line = ReadLine(dis)) != "")
{
int idx = line.IndexOf(':');
if (idx > 0)
{
string key = line.Substring(0, idx);
string value = line.Substring(idx + 1);
headers[key] = value;
Tracer.Debug("<<<" + key + " = " + value);
}
else
{
// lets ignore this bad header!
}
}
byte[] cOntent= null;
string length = ToString(headers["content-length"]);
if (length != null)
{
int size = Int32.Parse(length);
cOntent= dis.ReadBytes(size);
// Read the terminating NULL byte for this frame.
int nullByte = dis.Read();
if(nullByte != 0)
{
Tracer.Debug("<<<");
}
}
else
{
MemoryStream ms = new MemoryStream();
int nextChar;
while((nextChar = dis.Read()) != 0)
{
if( nextChar <0 )
{
// EOF ??
break;
}
ms.WriteByte((byte)nextChar);
}
cOntent= ms.ToArray();
}
Object answer = CreateCommand(command, headers, content);
Tracer.Debug("<<<" + answer);
return answer;
}
internal String ReadLine(BinaryReader dis)
{
MemoryStream ms = new MemoryStream();
while (true)
{
int nextChar = dis.Read();
if (nextChar <0)
{
throw new IOException("Peer closed the stream.");
}
if( nextChar == 10 )
{
break;
}
ms.WriteByte((byte)nextChar);
}
byte[] data = ms.ToArray();
return encoding.GetString(data, 0, data.Length);
}
这个源代码文件可以在下面地址看到:
https://svn.apache.org/repos/asf/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0/src/main/csharp/Transport/Stomp/StompWireFormat.cs
正确的代码如下:
internal string ReadLine(NetworkStream ns)
{
MemoryStream ms = new MemoryStream();
while (true)
{
int nextChar = ns.ReadByte();
if (nextChar <0)
{
throw new IOException("Peer closed the stream.");
}
if (nextChar == 10)
{
break;
}
ms.WriteByte((byte)nextChar);
}
byte[] data = ms.ToArray();
return encoding.GetString(data, 0, data.Length);
}
public Object Unmarshal(BinaryReader dis)
{
NetworkStream ns = dis.BaseStream as NetworkStream;
if (ns == null) return null;
if (!ns.CanRead) return null;
// 读取 command 信息
string command;
do
{
command = ReadLine(ns);
}
while (command == "");
Tracer.Debug("<<<" + command);
// 读取 header 信息
IDictionary headers = new Hashtable();
string line;
while ((line = ReadLine(ns)) != "")
{
int idx = line.IndexOf(':');
if (idx > 0)
{
string key = line.Substring(0, idx);
string value = line.Substring(idx + 1);
headers[key] = value;
Tracer.Debug("<<<" + key + " = " + value);
}
else
{
// lets ignore this bad header!
}
}
// 读取消息内容
MemoryStream ms = new MemoryStream();
do
{
int t = ns.ReadByte();
if (t <= 0) break;
ms.WriteByte((byte)t);
}
while (ns.DataAvailable);
byte[] cOntent= ms.ToArray();
Object answer = CreateCommand(command, headers, content);
Tracer.Debug("<<<" + answer);
return answer;
}
之前的错误代码,在发送“a1郭红俊b2” 这样的中英文数字混合的信息时,发送时,发送的 byte数组 信息如下:
97,49,233,131,173,231,186,162,228,191,138,98,50
接受时,接受到的 byte 数组信息就变成了
97,49,239,191,189,239,191,189,98,50
原先的 BinaryReader dis 其实是个这个开源组件自己写的派生自BinaryReader 的 OpenWireBinaryReader 类。这个类有很多不完善的地方。
这部分的逻辑可以在 TcpTransport 类的下面调用中看到
private readonly Socket socket;
private BinaryReader socketReader;
socketReader = new OpenWireBinaryReader(new NetworkStream(socket));
原先的 OpenWireBinaryReader 不完善的地方:
1、传送中文时,会丢数据;ms.WriteByte((byte)nextChar); 会让本来nextChar对应的 byte 数组,只取了数组的第一项,数组的其他项则丢失了;
2、编码混乱,unicode 和 utf-8 转换有问题。
参考资料:
Apache ActiveMQ
http://activemq.apache.org/
ActiveMQ .NET
http://activemq.apache.org/nms/