稀疏矩阵压缩存储之三元组

基础知识

在定义稀疏矩阵时,可以引入一个称为稀疏因子的数,其定义如下:假设对于一个 $m*n$ 矩阵,其中有 $t$ 个元素不为 $0$, 则定义稀疏因子 $δ=t/(m*n)$,一般认为 $δ<=0.05$ 的矩阵为稀疏矩阵。稀疏矩阵如果采用二维数组直接存储将会导致大量的空间存储 $0$ 值,对于空间是一种极大的浪费,因此一般采用压缩存储,目前稀疏矩阵常用三元组顺序表和十字链表来存储,本文将实现采用三元组顺序表压缩稀疏矩阵并实现矩阵常用的操作。

在实现三元组顺序表压缩存储稀疏矩阵之前,预先定义三元组节点结构如下:

1
2
3
4
5
6
7
template<typename DataType>
struct TripleAtom
{
int row; /*行号*/
int col; /*列号*/
DataType ele; /*存放的数据*/
};

需要是实现的矩阵基本运算主要有:

  • 矩阵转置:TransposeMatrix
  • 矩阵加法:AddMatrix
  • 矩阵减法:SubMatrix
  • 矩阵乘法:MultiMatrix
  • 打印矩阵:PrintMatrix

三元组顺序表基本定义

根据上述分析可定义如下基本ADT:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <vector>
using std::vector;

template<typename DataType>
class TripleTable
{
public:
TripleTable(vector<vector<DataType>> &inArray);
~TripleTable();
void TransposeMatrix(); /*常规转置思路*/
void FastTransposeMatrix(); /*快速转置*/
void AddMatrix(TripleTable<DataType> &otherMatrix); /*加上另外一个矩阵,第二个矩阵保持不变*/
void SubMatrix(TripleTable<DataType> &otherMatrix); /*减去另外一个矩阵,第二个矩阵保持不变*/
void MultiMatrix(TripleTable<DataType> &otherMatrix); /*乘上另外一个矩阵,第二个矩阵保持不变*/
vector<vector<DataType>> TransformTo2DArray(); /*转换成二维数组*/
void PrintMatrix(); /*打印矩阵*/
private:
void CalculateFirstNoneZeroArray(); /*计算每行第一个非零元素下标*/
private:
vector<TripleAtom<DataType>> data; /*存储实际的矩阵数据,自包含非零元个数*/
int matrixRowNum; /*矩阵行数*/
int matrixColNum; /*矩阵列数*/
int *rowFirstNoneZeroPos; /*每行第一个非零元素的起始位置,方便乘法*/
};

如上即为整个三元组顺序表的ADT基本结构,具体含义可见注释。接下来依次实现个函数。

三元组顺序表ADT的实现

  • 构造函数

构造函数主要将二维存储的稀疏矩阵转变为三元组形式存储的矩阵。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<typename DataType>
TripleTable<DataType>::TripleTable(vector<vector<DataType>> &inArray) : matrixRowNum(-1),
matrixColNum(-1), rowFirstNoneZeroPos(nullptr)
{
if (inArray.size() > 0 && inArray[0].size() > 0) {

matrixRowNum = int(inArray.size());
matrixColNum = int(inArray[0].size());

TripleAtom<DataType> tmpAtom;
for (int i = 0; i < int(inArray.size()); i++) {
for (int j = 0; j < int(inArray[i].size()); j++) {
if (inArray[i][j] != DataType(0)) {
tmpAtom.row = i+1; /*下标从1开始*/
tmpAtom.col = j+1; /*下标从1开始*/
tmpAtom.ele = inArray[i][j];
data.push_back(tmpAtom); /*压入*/
}
}
}

CalculateFirstNoneZeroArray();
}
}
  • 析构函数

析构主要释放rowFirstNoneZeroPos申请的空间,实现如下所示:

1
2
3
4
5
6
7
template<typename DataType>
TripleTable<DataType>::~TripleTable<DataType>()
{
delete [] rowFirstNoneZeroPos;
rowFirstNoneZeroPos = nullptr;
matrixRowNum = -1; matrixColNum = -1;
}
  • 常规矩阵转置

由于三元组顺序表的存储顺序是行优先依次存储,而转置后的三元组行号变成列号,原始行号有序使得转置后的列号有序,因此原始三元组转置后的一行中的列相对顺序是正确的。故此有如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <algorithm>
using std::swap;

template<typename DataType>
void TripleTable<DataType>::TransposeMatrix()
{
TripleAtom<DataType> tmpAtom;
vector<TripleAtom<DataType>> newMatrix;
for (int i = 1; i <= matrixColNum; i++) {
for (size_t j = 0; j < data.size(); j++) {
if (data[j].col == i) {
tmpAtom.row = data[j].col;
tmpAtom.col = data[j].row;
tmpAtom.ele = data[j].ele;
newMatrix.push_back(tmpAtom); /*压入*/
}
}
}

data.assign(newMatrix.begin(), newMatrix.end()); /*转置后的矩阵*/
swap(matrixRowNum, matrixColNum);

CalculateFirstNoneZeroArray();
}
  • 快速矩阵转置

由于在常规转置中,我们需要一行一行来处理,如果每遍历到一个三元组便知道该元素放入的位置,那么可以极大降低时间复杂度,此时我们需要计算一个转置后每行起始的位置,然后在遍历过程中可以快速填写三元组顺序表,具体实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <algorithm>
using std::swap;

template<typename DataType>
void TripleTable<DataType>::FastTransposeMatrix()
{
if (matrixRowNum <= 0 || matrixColNum <= 0) {
return;
}

int *colNum = new int[matrixColNum]; /*每列非零元素数量*/
int *colIndex = new int[matrixColNum]; /*每列第一个非零元素起始下标*/

for (int i = 0; i < matrixColNum; i++) {
colNum[i] = colIndex[0] = 0;
}

for (size_t i = 0; i < data.size(); i++) {
++colNum[data[i].col - 1];
}

for (int i = 1; i < matrixColNum; i++) {
/*计算每一列第一个非零元素下标*/
colIndex[i] = colIndex[i-1] + colNum[i-1];
}

vector<TripleAtom<DataType>> newMatrix(data.size());
TripleAtom<DataType> tmpAtom;
for (size_t i = 0; i < data.size(); i++) {

tmpAtom.row = data[i].col;
tmpAtom.col = data[i].row;
tmpAtom.ele = data[i].ele;

newMatrix[colIndex[tmpAtom.row-1]] = tmpAtom;
++colIndex[tmpAtom.row-1]; /*矩阵下标从1开始*/
}

data.assign(newMatrix.begin(), newMatrix.end()); /*转置后的矩阵*/
swap(matrixRowNum, matrixColNum);

CalculateFirstNoneZeroArray();

delete [] colNum; delete [] colIndex;
}

上述代码增加辅助空间后便将时间复杂度降低了一个量级。

  • 矩阵相加

在进行矩阵加法,首先需要判断两个矩阵是否能够相加,接着使用双下标指针分别指向第一个矩阵和第二个矩阵,如果两个指向的行列号相等只需直接相加,否则需要判断行列号大小,具体实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
template<typename DataType>
void TripleTable<DataType>::AddMatrix(TripleTable<DataType> &otherMatrix)
{
if (matrixRowNum != otherMatrix.matrixRowNum || matrixColNum != otherMatrix.matrixColNum) {
return;
}

vector<TripleAtom<DataType>> newMatrix;
TripleAtom<DataType> tmpAtom;

size_t i = 0, j = 0;

while (i < data.size() && j < otherMatrix.data.size()) {
if (data[i].row < otherMatrix.data[j].row) {
newMatrix.push_back(data[i]); ++i;
}
else if (data[i].row > otherMatrix.data[j].row) {
newMatrix.push_back(otherMatrix.data[j]); ++j;
}
else if (data[i].col < otherMatrix.data[j].col) {
newMatrix.push_back(data[i]); ++i;
}
else if (data[i].col > otherMatrix.data[j].col) {
newMatrix.push_back(otherMatrix.data[j]); ++j;
}
else {
tmpAtom.row = data[i].row;
tmpAtom.col = data[i].col;
tmpAtom.ele = data[i].ele + otherMatrix.data[j].ele;
newMatrix.push_back(tmpAtom);
++i; ++j;
}
}

while (i < data.size()) {
newMatrix.push_back(data[i]); ++i;
}

while (j < otherMatrix.data.size()) {
newMatrix.push_back(otherMatrix.data[j]); ++j;
}

data.assign(newMatrix.begin(), newMatrix.end()); /*相加后的矩阵*/

CalculateFirstNoneZeroArray(); /*重新计算*/
}
  • 矩阵减法

矩阵减法和矩阵加法类似,具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
template<typename DataType>
void TripleTable<DataType>::SubMatrix(TripleTable<DataType> &otherMatrix)
{
if (matrixRowNum != otherMatrix.matrixRowNum || matrixColNum != otherMatrix.matrixColNum) {
return;
}

vector<TripleAtom<DataType>> newMatrix;
TripleAtom<DataType> tmpAtom;

size_t i = 0, j = 0;

while (i < data.size() && j < otherMatrix.data.size()) {
if (data[i].row < otherMatrix.data[j].row) {
newMatrix.push_back(data[i]); ++i;
}
else if (data[i].row > otherMatrix.data[j].row) {
tmpAtom.row = otherMatrix.data[j].row;
tmpAtom.col = otherMatrix.data[j].col;
tmpAtom.ele = -otherMatrix.data[j].ele;
newMatrix.push_back(tmpAtom); ++j;
}
else if (data[i].col < otherMatrix.data[j].col) {
newMatrix.push_back(data[i]); ++i;
}
else if (data[i].col > otherMatrix.data[j].col) {
tmpAtom.row = otherMatrix.data[j].row;
tmpAtom.col = otherMatrix.data[j].col;
tmpAtom.ele = -otherMatrix.data[j].ele;
newMatrix.push_back(tmpAtom); ++j;
}
else {
tmpAtom.row = data[i].row;
tmpAtom.col = data[i].col;
tmpAtom.ele = data[i].ele - otherMatrix.data[j].ele;
newMatrix.push_back(tmpAtom);
++i; ++j;
}
}

while (i < data.size()) {
newMatrix.push_back(data[i]); ++i;
}

while (j < otherMatrix.data.size()) {
tmpAtom.row = otherMatrix.data[j].row;
tmpAtom.col = otherMatrix.data[j].col;
tmpAtom.ele = -otherMatrix.data[j].ele;
newMatrix.push_back(tmpAtom); ++j;
}

data.assign(newMatrix.begin(), newMatrix.end()); /*相减后的矩阵*/

CalculateFirstNoneZeroArray(); /*重新计算*/
}
  • 矩阵乘法

矩阵乘法相对而言比较复杂,矩阵乘法如果偷懒可以先解压缩然后使用常规乘法进行计算最后再把结果进行压缩,但这样就不能体现三元组的作用了,因此需要找出一种可以直接在三元组上进行计算的乘法算法。考虑到矩阵乘法的实质是从第一个矩阵抽出一个行向量和从第二个矩阵抽出一个列向量并做向量的点乘,其中行、列号分别对应在结果矩阵中的行、列号,综合上述分析我们可以一行一行处理,其首先也需要判断运算是否合法,具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
template<typename DataType>
void TripleTable<DataType>::MultiMatrix(TripleTable<DataType> &otherMatrix)
{
if (matrixColNum != otherMatrix.matrixRowNum) {
return;
}
if (data.size() <= 0 || otherMatrix.data.size() <= 0) {
/*如果有全零矩阵则相乘结果也是一个全零矩阵*/
data.clear(); return;
}

DataType *tmpAccumulate = new int[otherMatrix.matrixColNum]; /*记录每行各列的累加和*/

vector<TripleAtom<DataType>> newMatrix;
/*一行一行处理*/
for (int i = 0; i < matrixRowNum; i++) {
for (int j = 0; j < otherMatrix.matrixColNum; j++) {
tmpAccumulate[j] = DataType(0); /*清空累加器*/
}

/*行开始的地方*/
int rowBegin = rowFirstNoneZeroPos[i];
/*行结束的地方*/
int rowEnd = i >= matrixRowNum-1 ? int(data.size()) : rowFirstNoneZeroPos[i+1];

for (int j = rowBegin; j < rowEnd; j++) {
/*(m,n)*(n,k)矩阵的行,列对应下标相等才乘*/
int middleIndex = data[j].col - 1;
int anotherRowBegin = otherMatrix.rowFirstNoneZeroPos[middleIndex];
int anotherRowEnd = middleIndex >= otherMatrix.matrixRowNum-1 ? int(otherMatrix.data.size()) : otherMatrix.rowFirstNoneZeroPos[middleIndex+1];
for (int k = anotherRowBegin; k < anotherRowEnd; k++) {
tmpAccumulate[otherMatrix.data[k].col -1] += data[j].ele * otherMatrix.data[k].ele;
}
}

/*逐行压缩*/
TripleAtom<DataType> tmpAtom;
for (int j = 0; j < otherMatrix.matrixColNum; j++) {
if (tmpAccumulate[j] != DataType(0)) {
tmpAtom.row = i+1; /*下标从1开始*/
tmpAtom.col = j+1; /*下标从1开始*/
tmpAtom.ele = tmpAccumulate[j];
newMatrix.push_back(tmpAtom);
}
}
}

/*修改列数*/
matrixColNum = otherMatrix.matrixColNum;
data.assign(newMatrix.begin(), newMatrix.end()); /*相乘后的矩阵*/

CalculateFirstNoneZeroArray(); /*重新计算*/

delete [] tmpAccumulate;
}
  • 压缩矩阵转成二维数组

直接遍历即可,不存在意味着是零值。

1
2
3
4
5
6
7
8
9
10
11
template<typename DataType>
vector<vector<DataType>> TripleTable<DataType>::TransformTo2DArray()
{
vector<vector<DataType>> result(matrixRowNum, vector<DataType>(matrixColNum, DataType(0)));

for (size_t i = 0; i < data.size(); i++) {
result[data[i].row-1][data[i].col-1] = data[i].ele;
}

return result;
}
  • 打印矩阵

先调用转二维函数然后直接打印即可,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
using std::cout;
using std::endl;

template<typename DataType>
void TripleTable<DataType>::PrintMatrix()
{
vector<vector<DataType>> result = TransformTo2DArray();

for (size_t i = 0; i < result.size(); i++) {
for (size_t j = 0; j < result[i].size(); j++) {
cout << result[i][j] << ";";
}
cout << endl;
}
}
  • 计算每行元素的第一个下标值

这在快速矩阵转置中实现过类似的思路,直接把代码稍作修改即可,结果见下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template<typename DataType>
void TripleTable<DataType>::CalculateFirstNoneZeroArray()
{
if (rowFirstNoneZeroPos != nullptr) {
delete [] rowFirstNoneZeroPos;
}

/*正式给rowFirstNoneZeroPos申请空间*/
rowFirstNoneZeroPos = new int[matrixRowNum];

int *rowlNum = new int[matrixRowNum]; /*每列非零元素数量*/


for (int i = 0; i < matrixRowNum; i++) {
rowlNum[i] = rowFirstNoneZeroPos[i] = 0;
}

for (size_t i = 0; i < data.size(); i++) {
++rowlNum[data[i].row - 1];
}

for (int i = 1; i < matrixRowNum; i++) {
/*计算每一列第一个非零元素下标*/
rowFirstNoneZeroPos[i] = rowFirstNoneZeroPos[i-1] + rowlNum[i-1];
}

delete [] rowlNum;
}

代码测试

通过随机生成两个稀疏矩阵,然后依次测试各个函数是否正确,测试代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <vector>
#include <random>
#include <ctime>
#include <iostream>
#include <algorithm>
#include <chrono>
#include <functional>
using namespace std;
using namespace chrono;

int main()
{
default_random_engine randEngine(unsigned(time(nullptr)));
uniform_int_distribution<int> intDis(int(-100), int(100)); /*不能太大,防止溢出*/
uniform_int_distribution<int> mnDis(int(1), int(100));

int testTimes = 5;

while ((testTimes--) > 0) {

int m = mnDis(randEngine);
int n = mnDis(randEngine);
int k = mnDis(randEngine);

uniform_int_distribution<int> deltaDis1(int(1), int(m*n));
uniform_int_distribution<int> deltaDis2(int(1), int(n*k));

vector<vector<int>> testMatrix1(m, vector<int>(n, 0));
vector<vector<int>> testMatrix2(m, vector<int>(n, 0));
vector<vector<int>> testMatrix3(n, vector<int>(k, 0));

int delta1 = deltaDis1(randEngine);

/*这里偷个懒,矩阵1,2同时生成*/
for (int i = 0; i < delta1; i++) {
int row = mnDis(randEngine) % m;
int col = mnDis(randEngine) % n;
testMatrix1[row][col] = intDis(randEngine);

row = mnDis(randEngine) % m;
col = mnDis(randEngine) % n;
testMatrix2[row][col] = intDis(randEngine);
}

int delta2 = deltaDis2(randEngine);

for (int i = 0; i < delta2; i++) {
int row = mnDis(randEngine) % n;
int col = mnDis(randEngine) % k;
testMatrix3[row][col] = intDis(randEngine);
}

TripleTable<int> tripleTest1(testMatrix1);
TripleTable<int> tripleTest2(testMatrix2);
TripleTable<int> tripleTest3(testMatrix3);

tripleTest1.TransposeMatrix(); tripleTest1.TransposeMatrix();
cout << "常规转置测试结果:" << (tripleTest1.TransformTo2DArray() == testMatrix1 ? "Correct" : "Wrong") << endl;

tripleTest1.FastTransposeMatrix(); tripleTest1.FastTransposeMatrix();
cout << "快速转置测试结果:" << (tripleTest1.TransformTo2DArray() == testMatrix1 ? "Correct" : "Wrong") << endl;

vector<vector<int>> addMatrix(m, vector<int>(n, 0));
for (size_t i = 0; i < addMatrix.size(); i++) {
for (size_t j = 0; j < addMatrix[i].size(); j++) {
addMatrix[i][j] = testMatrix1[i][j] + testMatrix2[i][j];
}
}

tripleTest1.AddMatrix(tripleTest2);
cout << "矩阵加法测试结果:" << (tripleTest1.TransformTo2DArray() == addMatrix ? "Correct" : "Wrong") << endl;

tripleTest1.SubMatrix(tripleTest2);
cout << "矩阵减法测试结果:" << (tripleTest1.TransformTo2DArray() == testMatrix1 ? "Correct" : "Wrong") << endl;

vector<vector<int>> multiMatrix(m, vector<int>(k, 0));

for (size_t i = 0; i < multiMatrix.size(); i++) {
for (size_t j = 0; j < multiMatrix[i].size(); j++) {
for (int k = 0; k < n; k++) {
multiMatrix[i][j] += testMatrix1[i][k]*testMatrix3[k][j];
/*矩阵乘法*/
}
}
}

tripleTest1.MultiMatrix(tripleTest3);
cout << "矩阵乘法测试结果:" << (tripleTest1.TransformTo2DArray() == multiMatrix ? "Correct" : "Wrong") << endl;

/*tripleTest1.PrintMatrix();*/
}
return 0;
}

参考资料

稀疏矩阵

数据结构