-
Notifications
You must be signed in to change notification settings - Fork 33
/
csv.ts
206 lines (188 loc) · 5.42 KB
/
csv.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import csvParse from 'csv-parse';
import {stringify, Options} from 'csv-stringify/sync';
import * as fs from 'fs-extra';
const csvOptions: csvParse.Options = {
skip_empty_lines: true,
};
interface Row {
type: 'row';
value: string[];
}
interface Error {
type: 'error';
error: any;
}
interface Done {
type: 'done';
}
type RowResult = Row | Error | Done;
function isPromise(x: any): x is Promise<any> {
return 'then' in x;
}
/** Read a CSV file line-by-line. */
export async function* readRows(file: string) {
const parser = csvParse.parse(csvOptions);
const stream = fs.createReadStream(file, 'utf8');
let dataCallback: () => void | undefined;
const mkBarrier = () =>
new Promise<void>((resolve, reject) => {
dataCallback = resolve;
});
// TODO(danvk): use a deque
const rows: (RowResult | Promise<void>)[] = [mkBarrier()];
parser.on('readable', () => {
let row;
while ((row = parser.read())) {
rows.push({type: 'row', value: row});
}
const oldCb = dataCallback;
rows.push(mkBarrier());
oldCb();
});
parser.on('error', error => {
rows.push({type: 'error', error});
parser.pause();
dataCallback();
});
parser.on('finish', () => {
rows.push({type: 'done'});
dataCallback();
});
stream.pipe(parser);
while (true) {
const row = rows.shift();
if (!row) {
break;
} else if (isPromise(row)) {
await row;
} else {
if (row.type === 'row') {
yield row.value;
} else if (row.type === 'error') {
throw new Error(row.error);
} else if (row.type === 'done') {
break;
}
}
}
}
/** Read just the headers from a CSV file. */
export async function readHeaders(file: string) {
for await (const row of readRows(file)) {
return row;
}
throw new Error(`Unexpected empty file: ${file}`);
}
/** Write a CSV file */
export async function writeCsv(file: string, rows: string[][], options?: Options) {
// TODO(danvk): make this less memory-intensive
const output = stringify(rows, options);
await fs.writeFile(file, output, {encoding: 'utf8'});
}
const LF = '\n'.charCodeAt(0);
const CR = '\r'.charCodeAt(0);
/** Determine the type of line endings a file uses by looking for the first one. */
export function detectLineEnding(path: string) {
const f = fs.openSync(path, 'r');
const SIZE = 10_000;
const buffer = Buffer.alloc(SIZE);
const n = fs.readSync(f, buffer, 0, SIZE, 0);
fs.closeSync(f);
for (let i = 0; i < n - 1; i++) {
const [a, b] = [buffer[i], buffer[i + 1]];
if (a == CR && b == LF) {
return '\r\n'; // Windows
} else if (a == LF) {
return '\n'; // Unix
} else if (a == CR) {
return '\r'; // Old Mac
}
}
return undefined;
}
/**
* Append one row to a CSV file.
*
* If the row contains a new header, the entire file will be rewritten.
*/
export async function appendRow(file: string, row: {[column: string]: string}) {
const exists = await fs.pathExists(file);
if (!exists) {
// Easy: write the whole file.
const header = Object.keys(row);
const rows = [header, header.map(k => row[k])];
return writeCsv(file, rows);
}
const lineEnding = detectLineEnding(file);
const lines = readRows(file);
const headerRow = await lines.next();
if (headerRow.done) {
throw new Error(`CSV file ${file} was empty`);
}
const headers = headerRow.value;
const headerToIndex: {[header: string]: number} = {};
headers.forEach((header, i) => {
headerToIndex[header] = i;
});
// Check if there are any new headers in the row.
const newHeaders = [];
for (const k in row) {
if (!(k in headerToIndex)) {
newHeaders.push(k);
}
}
if (newHeaders.length) {
const fullHeaders = headers.concat(newHeaders);
const rows = [fullHeaders];
const emptyCols = newHeaders.map(() => '');
for await (const row of lines) {
rows.push(row.concat(emptyCols));
}
rows.push(fullHeaders.map(k => row[k] || ''));
await writeCsv(file, rows, {record_delimiter: lineEnding});
} else {
// write the new row
const newRow = headers.map(k => row[k] || '');
await lines.return(); // close the file for reading.
// Add a newline if the file doesn't end with one.
const f = fs.openSync(file, 'a+');
const {size} = fs.fstatSync(f);
const {buffer} = await fs.read(f, Buffer.alloc(2), 0, 2, size - 2);
const tail = buffer.toString('utf8');
const hasTrailingNewline = tail.endsWith(lineEnding ?? '\n');
const lineStr =
(hasTrailingNewline ? '' : lineEnding) + stringify([newRow], {record_delimiter: lineEnding});
await fs.appendFile(f, lineStr);
await fs.close(f);
}
}
// Note: this might change line endings in the file.
export async function deleteLastRow(file: string) {
const rows = [];
for await (const row of readRows(file)) {
rows.push(row);
}
await writeCsv(file, rows.slice(0, -1));
return rows[rows.length - 1];
}
export async function* readRowObjects(file: string) {
let header: string[] | undefined;
for await (const row of readRows(file)) {
if (!header) {
header = row;
} else {
const rowObj: {[column: string]: string} = {};
for (const [i, col] of row.entries()) {
rowObj[header[i]] = col;
}
yield rowObj;
}
}
}
export async function readAllRowObjects(file: string) {
const objs: {[column: string]: string}[] = [];
for await (const obj of readRowObjects(file)) {
objs.push(obj);
}
return objs;
}