JavaScriptでメモリ制御しながらローカルファイルを加工する

ローカルファイルの加工なんぞPythonなりNodeなりJavaなり好きな言語でやればよいのだが、たまに環境の制約上ブラウザのJavascriptを使ってデータ加工をやりたい時がある。例えば、ツールをチームメンバと共有する場合などで、特段のセットアップなしで動くデータ加工ツールを作ろうとすると、PowershellかVBSかJavaScriptくらいしか選択肢がない。
その上で、GUI含めて作ろうとすると、JavaScriptが一番楽な場合があったりする。
ただ、巨大なファイルを扱う場合は、分割読みしたり、分割出力したり、それなりに考慮したコードを書かなきゃいけないことが多かった。今までは都度書いていたのだが、汎用的に使えるコードを作っておきたいと思ったので書いてみた。

HTML

<input type="file" id="file" multiple>

基本はファイル用のINPUTだけでOK。

JavaScript

パラメータ設定

const charset = "utf-8"; //入力文字コード
const readChunkSize = 1024*1024; //分割入力データサイズ(byte)
const writeChunkSize = 1024*1024; //分割出力データサイズ(char)
const merge = false; //入力ファイルが複数の場合に出力をマージするか否か
const parallel = false; //入力ファイルが複数の場合に並列で処理するか否か

挙動制御用のパラメータ。

データ加工内容定義

class Converter {
  //初期化処理
  constructor() {
    this.counter = 0;
  }

  //入力ファイル単位で1行目を処理する前に実行する処理
  preProcess() {
  }

  //行に対して実行する処理
  //引数は行データ(改行なし)
  recordProcess(record) {
    this.counter += 1
    return `${record}\n`;
  }

  //入力ファイル単位で全ての行を処理した後に実行する処理
  postProcess() {
    console.log(`post line num:${this.counter}`);
  }

  //ファイルマージが有効の場合にのみ呼び出される
  //全てのファイルの全ての行が処理された後に実行する処理
  terminate() {
    console.log(`total line num:${this.counter}`);
  }
}

データ加工内容。やりたい内容に応じて、このクラスだけを変更する想定。
constructor以外のメソッドは、returnで文字列を返却することで、出力ファイルにデータを書き出すことができる。
上記サンプルではrecordProcessでのみ改行を付与した文字列を応答しているので、入力ファイルと同じ内容のファイルを出力することになる。

ファイル読込処理

const inputFile = document.getElementById('file');
inputFile.addEventListener('change', fileChange, false);
function fileChange(e) {
  const files = e.target.files;
  if (files.length == 0) return;  
  if (parallel) {
    var writer = parallelRead(files, charset, readChunkSize, writeChunkSize, merge)
  } else {
    var writer = serialRead(files, charset, readChunkSize, writeChunkSize, merge)
  }
  if (merge) terminate(files.length, writer);
}

処理の入り口。ファイルセレクタの変更をトリガーにfileChangeを起動する。fileChangeは並列もしくは直列でファイルを読み込み、(マージが有効な場合のみ)全ファイルの読込完了を判定する関数であるterminateを呼び出す。

並列ファイル読込処理

function parallelRead(files, charset, readChunkSize, writeChunkSize, merge) {
  if (merge) var manager = new ChunkManager(charset, files[0].name, writeChunkSize, merge)
  for (let i=0; i<files.length; i++) {
    if (!merge) var manager = new ChunkManager(charset, files[i].name, writeChunkSize, merge)
    readFile(files[i], readChunkSize, manager);
  }
  return manager.writer
}

ファイルを並列で読み込む場合に呼び出される関数。ファイル出力を制御するChunkManagerを生成し、readFileに渡す。ChunkManagerは、マージ有効時にはを1つだけ、無効時には入力ファイル数と同じだけ生成する。

直列ファイル読込処理

function serialRead(files, charset, readChunkSize, writeChunkSize, merge, fileIdx=0, manager){
  let doRead = false
  if (merge) {
    if (!manager) var manager = new ChunkManager(charset, files[0].name, writeChunkSize, merge);
    if (manager.writer.fileCount == fileIdx) doRead = true;
  } else {
    if (!manager || manager.writer.fileCount == 1) {
      var manager = new ChunkManager(charset, files[fileIdx].name, writeChunkSize, merge);
      doRead = true
    }
  }
  if (doRead) {
    readFile(files[fileIdx], readChunkSize, manager);
    if (files.length-1 > fileIdx) {
      serialRead(files, charset, readChunkSize, writeChunkSize, merge, fileIdx+1, manager)
    }
  } else {
    setTimeout(function(){serialRead(files, charset, readChunkSize, writeChunkSize, merge, fileIdx, manager);}, 1000);
  }
  return manager.writer
}

ファイルを直列で読み込む場合に呼び出される関数。ファイル出力を制御するChunkManagerクラスのインスタンスを生成し、readFileに渡す。ChunkManagerは、マージ有効時にはを1つだけ、無効時には入力ファイル数と同じだけ生成する。加工処理中のファイルを1つに保つため、処理状況を判定し、処理中の場合は再起呼び出しを行っている。

読込終了処理

function terminate(fileNum, writer) {
  console.log(`terminate check... ${writer.fileCount}/${fileNum}`)
  if (writer.fileCount == fileNum) {
    console.log(`terminate!!`)
    writer.terminate();
  } else {
    setTimeout(function(){terminate(fileNum, writer);}, 1000);
  }
}

全ファイルの読込完了を判定する関数。マージが有効な場合にのみ呼び出される。待ち合わせのため、処理状況を判定し、処理中のファイルがある場合は再起呼び出しを行っている。

ファイル分割読込処理

function readFile(blob, readChunkSize, manager) {
  let offset = 0;

  const fileReader = new FileReader();
  fileReader.onload = function(e) {
    manager.add(e.target.result);
    if (offset < blob.size) {
      read();
    } else {
      console.log(`read end ${blob.name}`)
      manager.writer.postProcess();
    }
  }
  fileReader.onerror = function(e) {
    console.error(e.target.error.name);
  }

  console.log(`read start ${blob.name}`)
  manager.writer.preProcess();
  read()

  function read() {
    const chunk = blob.slice(offset, offset+readChunkSize, blob.type);
    offset += readChunkSize;
    fileReader.readAsArrayBuffer(chunk);
  }
}

分割読込を行う処理。readChunkSizeバイトずつ入力ファイルを読み込み、ChunkManageradd関数に渡している。

行データ抽出クラス

class ChunkManager {
  constructor(charset, name, writeChunkSize, merge) {
      this.buffer = new Uint8Array(0);
      this.decoder = new TextDecoder(charset);
      this.writer = new Writer(name, writeChunkSize, merge);
  }

  add(chunk) {
    let newlineIndex = 0;
    let byteArray = [];

    const data = new Uint8Array(this.buffer.byteLength + chunk.byteLength);
    data.set(this.buffer, 0);
    data.set(new Uint8Array(chunk), this.buffer.byteLength);

    for(let i=0; i<data.length; i++) {
      if(data[i] === 10 || data[i] === 13) { // 10:LF(\n),13:CR(\r)
          if(data[i] === 13 && data[i+1] === 10) i++; // CRLF

          this.writer.recordProcess(this.decoder.decode(new Uint8Array(byteArray)));

          newlineIndex = i + 1;
          byteArray = [];
      } else {
          byteArray.push(data[i]);
      }
    }

    this.buffer = data.slice(newlineIndex);
  }
}

分割読込したデータを制御し行単位にまとめるクラス。コンストラクタでWriterクラスのインスタンスを生成する。行データを見つけ、WriterrecordProcess関数に渡している。

ファイル出力クラス

class Writer {
  constructor(name, writeChunkSize, merge) {
    const parts = name.split(".")
    this.name = parts.slice(0,parts.length-1).join(".");
    this.extension = parts[parts.length-1];
    this.writeChunkSize = writeChunkSize;
    this.merge = merge;
    this.fileCount = 0;
    this.writeCount = 0;
    this.charCount = 0;
    this.records = [];
    this.converter = new Converter();
  }

  preProcess() {
    const text = this.converter.preProcess();
    if (text) this.records.push(text);
  }

  recordProcess(record) {
    const text = this.converter.recordProcess(record);
    if (text) {
      if ((this.charCount + text.length) > this.writeChunkSize) {
        this.write();
      }
      this.charCount += text.length
      this.records.push(text);
    }
  }

  postProcess() {
    const text = this.converter.postProcess();
    if (text) this.records.push(text);
    if (!this.merge) this.write(this.defaultName(this.writeCount == 0));
    this.fileCount += 1;
  }

  terminate() {
    const text = this.converter.terminate();
    if (text) this.records.push(text);
    if (this.merge) this.write(this.defaultName(this.writeCount == 0));
  }

  defaultName(isSingleFile) {
    if (isSingleFile) {
      return `${this.name}.${this.extension}`
    } else {
      return `${this.name}${this.writeCount}.${this.extension}`
    }
  }

  write(fileName = this.defaultName()) {
    if (this.records[0]) {
      const blob = new Blob(this.records, {type: 'text/plain'});
      this.download(blob, fileName);
      this.writeCount += 1;
      this.charCount = 0;
      this.records = [];
    }
  }

  download(blob, fileName) {
    const url = URL.createObjectURL(blob, fileName);
    const a = document.createElement("a");
    document.body.appendChild(a);
    a.download = fileName;
    a.href = url;
    a.click();
    a.remove();
    URL.revokeObjectURL(url);
  }
}

ファイル出力を司るクラス。コンストラクタでConverterクラスのインスタンスを生成する。様々なタイミングで起動される関数で、Converterの関数を呼び出し、受け取った加工済データをバッファリング(this.records)する。蓄積したデータの文字数がwriteChunkSizeを超えそうになったタイミング、もしくは、入力ファイルの読み込みが完了したタイミングで、加工済みデータをファイル出力する。

まとめ

まとめるとこんな感じ。パラメータを画面から制御できるように拡張している以外は、上に記載した内容をまとめたもの。

<li><input type="file" id="file" multiple></li>
<hr>
<li><label for="charset">文字コード:</label>
<input type="test" id="charset" value="utf-8"></li>
<li><label for="readChunkSize">読込チャンクサイズ:</label>
<input type="number" id="readChunkSize" value="1000000"></li>
<li><label for="writeChunkSize">書込チャンクサイズ:</label>
<input type="number" id="writeChunkSize" value="1000000"></li>
<li><label for="merge">ファイルをマージ:</label>
<input type="checkbox" id="merge"></li>
<li><label for="parallel">parallel実行:</label>
<input type="checkbox" id="parallel"></li>
<style>
li {
  margin-left:10px;
  margin-bottom:10px;
  list-style: none;
}
label {
  display:block;
  float:left;
  width:200px;
}
</style>
<script>
class Converter {
  //初期化処理
  constructor() {
    this.counter = 0;
  }

  //入力ファイル単位で1行目を処理する前に実行する処理
  preProcess() {
  }

  //行に対して実行する処理
  //引数は行データ(改行なし)
  recordProcess(record) {
    this.counter += 1
    return `${record}\n`;
  }

  //入力ファイル単位で全ての行を処理した後に実行する処理
  postProcess() {
    console.log(`post line num:${this.counter}`);
  }

  //ファイルマージが有効の場合にのみ呼び出される
  //全てのファイルの全ての行が処理された後に実行する処理
  terminate() {
    console.log(`total line num:${this.counter}`);
  }
}

const inputFile = document.getElementById('file');
inputFile.addEventListener('change', fileChange, false);
function fileChange(e) {
  const charset = document.getElementById('charset').value; //入力文字コード
  const readChunkSize = document.getElementById('readChunkSize').value;  //分割入力データサイズ(byte)
  const writeChunkSize = document.getElementById('writeChunkSize').value; //分割出力データサイズ(char)
  const merge = document.getElementById('merge').checked; //入力ファイルが複数の場合に出力をマージするか否か
  const parallel = document.getElementById('parallel').checked; //入力ファイルが複数の場合に並列で処理するか否か

  const files = e.target.files;
  if (files.length == 0) return;  
  if (parallel) {
    var writer = parallelRead(files, charset, readChunkSize, writeChunkSize, merge)
  } else {
    var writer = serialRead(files, charset, readChunkSize, writeChunkSize, merge)
  }
  if (merge) terminate(files.length, writer);
}

function parallelRead(files, charset, readChunkSize, writeChunkSize, merge) {
  if (merge) var manager = new ChunkManager(charset, files[0].name, writeChunkSize, merge)
  for (let i=0; i<files.length; i++) {
    if (!merge) var manager = new ChunkManager(charset, files[i].name, writeChunkSize, merge)
    readFile(files[i], readChunkSize, manager);
  }
  return manager.writer
}

function serialRead(files, charset, readChunkSize, writeChunkSize, merge, fileIdx=0, manager){
  let doRead = false
  if (merge) {
    if (!manager) var manager = new ChunkManager(charset, files[0].name, writeChunkSize, merge);
    if (manager.writer.fileCount == fileIdx) doRead = true;
  } else {
    if (!manager || manager.writer.fileCount == 1) {
      var manager = new ChunkManager(charset, files[fileIdx].name, writeChunkSize, merge);
      doRead = true
    }
  }
  if (doRead) {
    readFile(files[fileIdx], readChunkSize, manager);
    if (files.length-1 > fileIdx) {
      serialRead(files, charset, readChunkSize, writeChunkSize, merge, fileIdx+1, manager)
    }
  } else {
    setTimeout(function(){serialRead(files, charset, readChunkSize, writeChunkSize, merge, fileIdx, manager);}, 1000);
  }
  return manager.writer
}

function terminate(fileNum, writer) {
  console.log(`terminate check... ${writer.fileCount}/${fileNum}`)
  if (writer.fileCount == fileNum) {
    console.log(`terminate!!`)
    writer.terminate();
  } else {
    setTimeout(function(){terminate(fileNum, writer);}, 1000);
  }
}

function readFile(blob, readChunkSize, manager) {
  let offset = 0;

  const fileReader = new FileReader();
  fileReader.onload = function(e) {
    manager.add(e.target.result);
    if (offset < blob.size) {
      read();
    } else {
      console.log(`read end ${blob.name}`)
      manager.writer.postProcess();
    }
  }
  fileReader.onerror = function(e) {
    console.error(e.target.error.name);
  }

  console.log(`read start ${blob.name}`)
  manager.writer.preProcess();
  read()

  function read() {
    const chunk = blob.slice(offset, offset+readChunkSize, blob.type);
    offset += readChunkSize;
    fileReader.readAsArrayBuffer(chunk);
  }
}

class ChunkManager {
  constructor(charset, name, writeChunkSize, merge) {
      this.buffer = new Uint8Array(0);
      this.decoder = new TextDecoder(charset);
      this.writer = new Writer(name, writeChunkSize, merge);
  }

  add(chunk) {
    let newlineIndex = 0;
    let byteArray = [];

    const data = new Uint8Array(this.buffer.byteLength + chunk.byteLength);
    data.set(this.buffer, 0);
    data.set(new Uint8Array(chunk), this.buffer.byteLength);

    for(let i=0; i<data.length; i++) {
      if(data[i] === 10 || data[i] === 13) { // 10:LF(\n),13:CR(\r)
          if(data[i] === 13 && data[i+1] === 10) i++; // CRLF

          this.writer.recordProcess(this.decoder.decode(new Uint8Array(byteArray)));

          newlineIndex = i + 1;
          byteArray = [];
      } else {
          byteArray.push(data[i]);
      }
    }

    this.buffer = data.slice(newlineIndex);
  }
}

class Writer {
  constructor(name, writeChunkSize, merge) {
    const parts = name.split(".")
    this.name = parts.slice(0,parts.length-1).join(".");
    this.extension = parts[parts.length-1];
    this.writeChunkSize = writeChunkSize;
    this.merge = merge;
    this.fileCount = 0;
    this.writeCount = 0;
    this.charCount = 0;
    this.records = [];
    this.converter = new Converter();
  }

  preProcess() {
    const text = this.converter.preProcess();
    if (text) this.records.push(text);
  }

  recordProcess(record) {
    const text = this.converter.recordProcess(record);
    if (text) {
      if ((this.charCount + text.length) > this.writeChunkSize) {
        this.write();
      }
      this.charCount += text.length
      this.records.push(text);
    }
  }

  postProcess() {
    const text = this.converter.postProcess();
    if (text) this.records.push(text);
    if (!this.merge) this.write(this.defaultName(this.writeCount == 0));
    this.fileCount += 1;
  }

  terminate() {
    const text = this.converter.terminate();
    if (text) this.records.push(text);
    if (this.merge) this.write(this.defaultName(this.writeCount == 0));
  }

  defaultName(isSingleFile) {
    if (isSingleFile) {
      return `${this.name}.${this.extension}`
    } else {
      return `${this.name}${this.writeCount}.${this.extension}`
    }
  }

  write(fileName = this.defaultName()) {
    if (this.records[0]) {
      const blob = new Blob(this.records, {type: 'text/plain'});
      this.download(blob, fileName);
      this.writeCount += 1;
      this.charCount = 0;
      this.records = [];
    }
  }

  download(blob, fileName) {
    const url = URL.createObjectURL(blob, fileName);
    const a = document.createElement("a");
    document.body.appendChild(a);
    a.download = fileName;
    a.href = url;
    a.click();
    a.remove();
    URL.revokeObjectURL(url);
  }
}
</script>
タイトルとURLをコピーしました